fix: use internode data for DisksInfo, VolsInfo in message pack (#10821)

Similar to #10775 for fewer memory allocations, since we use
getOnlineDisks() extensively for listing we should optimize it
further.

Additionally, remove all unused walkers from the storage layer
This commit is contained in:
Harshavardhana 2020-11-04 10:10:54 -08:00 committed by GitHub
parent 4a1efabda4
commit 1a1f00fa15
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 875 additions and 929 deletions

View file

@ -88,11 +88,6 @@ type erasureSets struct {
disksStorageInfoCache timedValue
// Merge tree walk
pool *MergeWalkPool
poolSplunk *MergeWalkPool
poolVersions *MergeWalkVersionsPool
mrfMU sync.Mutex
mrfOperations map[healSource]int
}
@ -356,9 +351,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
disksConnectEvent: make(chan diskConnectInfo),
distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID),
pool: NewMergeWalkPool(globalMergeLookupTimeout),
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout),
mrfOperations: make(map[healSource]int),
}
@ -926,10 +918,6 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
return lentry, lexicallySortedEntryCount, isTruncated
}
func (s *erasureSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh {
return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1, false)
}
func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh {
return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1)
}
@ -964,42 +952,6 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
return entryChs
}
// Starts a walk channel across n number of disks and returns a slice of
// FileInfoCh which can be read from.
func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int, splunk bool) []FileInfoCh {
var entryChs []FileInfoCh
var wg sync.WaitGroup
var mutex sync.Mutex
for _, set := range s.sets {
// Reset for the next erasure set.
for _, disk := range set.getLoadBalancedNDisks(ndisks) {
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
var entryCh chan FileInfo
var err error
if splunk {
entryCh, err = disk.WalkSplunk(GlobalContext, bucket, prefix, marker, endWalkCh)
} else {
entryCh, err = disk.Walk(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
}
if err != nil {
// Disk walk returned error, ignore it.
return
}
mutex.Lock()
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
mutex.Unlock()
}(disk)
}
}
wg.Wait()
return entryChs
}
func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
// In list multipart uploads we are going to treat input prefix as the object,
// this means that we are not supporting directory navigation.

View file

@ -1,353 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2019, 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"reflect"
"sync"
"time"
)
const (
globalMergeLookupTimeout = time.Minute * 1 // 1 minutes.
)
// mergeWalkVersions - represents the go routine that does the merge walk versions.
type mergeWalkVersions struct {
added time.Time
entryChs []FileInfoVersionsCh
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
}
// mergeWalk - represents the go routine that does the merge walk.
type mergeWalk struct {
added time.Time
entryChs []FileInfoCh
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
}
// MergeWalkVersionsPool - pool of mergeWalk go routines.
// A mergeWalk is added to the pool by Set() and removed either by
// doing a Release() or if the concerned timer goes off.
// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that
// it can be looked up across related list calls.
type MergeWalkVersionsPool struct {
sync.Mutex
pool map[listParams][]mergeWalkVersions
timeOut time.Duration
}
// NewMergeWalkVersionsPool - initialize new tree walk pool for versions.
func NewMergeWalkVersionsPool(timeout time.Duration) *MergeWalkVersionsPool {
tPool := &MergeWalkVersionsPool{
pool: make(map[listParams][]mergeWalkVersions),
timeOut: timeout,
}
return tPool
}
// Release - similar to mergeWalkPool.Release but for versions.
func (t *MergeWalkVersionsPool) Release(params listParams) ([]FileInfoVersionsCh, chan struct{}) {
t.Lock()
defer t.Unlock()
walks, ok := t.pool[params] // Pick the valid walks.
if !ok || len(walks) == 0 {
// Release return nil if params not found.
return nil, nil
}
// Pop out the first valid walk entry.
walk := walks[0]
walks = walks[1:]
if len(walks) > 0 {
t.pool[params] = walks
} else {
delete(t.pool, params)
}
walk.endTimerCh <- struct{}{}
return walk.entryChs, walk.endWalkCh
}
// Set - similar to mergeWalkPool.Set but for file versions
func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersionsCh, endWalkCh chan struct{}) {
t.Lock()
defer t.Unlock()
// If we are above the limit delete at least one entry from the pool.
if len(t.pool) > treeWalkEntryLimit {
age := time.Now()
var oldest listParams
for k, v := range t.pool {
if len(v) == 0 {
delete(t.pool, k)
continue
}
// The first element is the oldest, so we only check that.
e := v[0]
if e.added.Before(age) {
oldest = k
age = e.added
}
}
// Invalidate and delete oldest.
if walks, ok := t.pool[oldest]; ok && len(walks) > 0 {
endCh := walks[0].endTimerCh
endWalkCh := walks[0].endWalkCh
if len(walks) > 1 {
// Move walks forward
copy(walks, walks[1:])
walks = walks[:len(walks)-1]
t.pool[oldest] = walks
} else {
// Only entry, just delete.
delete(t.pool, oldest)
}
select {
case endCh <- struct{}{}:
close(endWalkCh)
default:
}
} else {
// Shouldn't happen, but just in case.
delete(t.pool, oldest)
}
}
// Should be a buffered channel so that Release() never blocks.
endTimerCh := make(chan struct{}, 1)
walkInfo := mergeWalkVersions{
added: UTCNow(),
entryChs: resultChs,
endWalkCh: endWalkCh,
endTimerCh: endTimerCh,
}
// Append new walk info.
walks := t.pool[params]
if len(walks) < treeWalkSameEntryLimit {
t.pool[params] = append(walks, walkInfo)
} else {
// We are at limit, invalidate oldest, move list down and add new as last.
select {
case walks[0].endTimerCh <- struct{}{}:
close(walks[0].endWalkCh)
default:
}
copy(walks, walks[1:])
walks[len(walks)-1] = walkInfo
}
// Timer go-routine which times out after t.timeOut seconds.
go func(endTimerCh <-chan struct{}, walkInfo mergeWalkVersions) {
select {
// Wait until timeOut
case <-time.After(t.timeOut):
// Timeout has expired. Remove the mergeWalk from mergeWalkPool and
// end the mergeWalk go-routine.
t.Lock()
defer t.Unlock()
walks, ok := t.pool[params]
if ok {
// Trick of filtering without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
nwalks := walks[:0]
// Look for walkInfo, remove it from the walks list.
for _, walk := range walks {
if !reflect.DeepEqual(walk, walkInfo) {
nwalks = append(nwalks, walk)
}
}
if len(nwalks) == 0 {
// No more mergeWalk go-routines associated with listParams
// hence remove map entry.
delete(t.pool, params)
} else {
// There are more mergeWalk go-routines associated with listParams
// hence save the list in the map.
t.pool[params] = nwalks
}
}
// Signal the mergeWalk go-routine to die.
close(endWalkCh)
case <-endTimerCh:
return
}
}(endTimerCh, walkInfo)
}
// MergeWalkPool - pool of mergeWalk go routines.
// A mergeWalk is added to the pool by Set() and removed either by
// doing a Release() or if the concerned timer goes off.
// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that
// it can be looked up across related list calls.
type MergeWalkPool struct {
sync.Mutex
pool map[listParams][]mergeWalk
timeOut time.Duration
}
// NewMergeWalkPool - initialize new tree walk pool.
func NewMergeWalkPool(timeout time.Duration) *MergeWalkPool {
tPool := &MergeWalkPool{
pool: make(map[listParams][]mergeWalk),
timeOut: timeout,
}
return tPool
}
// Release - selects a mergeWalk from the pool based on the input
// listParams, removes it from the pool, and returns the MergeWalkResult
// channel.
// Returns nil if listParams does not have an associated mergeWalk.
func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) {
t.Lock()
defer t.Unlock()
walks, ok := t.pool[params] // Pick the valid walks.
if !ok || len(walks) == 0 {
// Release return nil if params not found.
return nil, nil
}
// Pop out the first valid walk entry.
walk := walks[0]
walks[0] = mergeWalk{} // clear references.
walks = walks[1:]
if len(walks) > 0 {
t.pool[params] = walks
} else {
delete(t.pool, params)
}
walk.endTimerCh <- struct{}{}
return walk.entryChs, walk.endWalkCh
}
// Set - adds a mergeWalk to the mergeWalkPool.
// Also starts a timer go-routine that ends when:
// 1) time.After() expires after t.timeOut seconds.
// The expiration is needed so that the mergeWalk go-routine resources are freed after a timeout
// if the S3 client does only partial listing of objects.
// 2) Release() signals the timer go-routine to end on endTimerCh.
// During listing the timer should not timeout and end the mergeWalk go-routine, hence the
// timer go-routine should be ended.
func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh chan struct{}) {
t.Lock()
defer t.Unlock()
// If we are above the limit delete at least one entry from the pool.
if len(t.pool) > treeWalkEntryLimit {
age := time.Now()
var oldest listParams
for k, v := range t.pool {
if len(v) == 0 {
delete(t.pool, k)
continue
}
// The first element is the oldest, so we only check that.
e := v[0]
if e.added.Before(age) {
oldest = k
age = e.added
}
}
// Invalidate and delete oldest.
if walks, ok := t.pool[oldest]; ok && len(walks) > 0 {
endCh := walks[0].endTimerCh
endWalkCh := walks[0].endWalkCh
if len(walks) > 1 {
// Move walks forward
copy(walks, walks[1:])
walks = walks[:len(walks)-1]
t.pool[oldest] = walks
} else {
// Only entry, just delete.
delete(t.pool, oldest)
}
select {
case endCh <- struct{}{}:
close(endWalkCh)
default:
}
} else {
// Shouldn't happen, but just in case.
delete(t.pool, oldest)
}
}
// Should be a buffered channel so that Release() never blocks.
endTimerCh := make(chan struct{}, 1)
walkInfo := mergeWalk{
added: UTCNow(),
entryChs: resultChs,
endWalkCh: endWalkCh,
endTimerCh: endTimerCh,
}
// Append new walk info.
walks := t.pool[params]
if len(walks) < treeWalkSameEntryLimit {
t.pool[params] = append(walks, walkInfo)
} else {
// We are at limit, invalidate oldest, move list down and add new as last.
select {
case walks[0].endTimerCh <- struct{}{}:
close(walks[0].endWalkCh)
default:
}
copy(walks, walks[1:])
walks[len(walks)-1] = walkInfo
}
// Timer go-routine which times out after t.timeOut seconds.
go func(endTimerCh <-chan struct{}, walkInfo mergeWalk) {
select {
// Wait until timeOut
case <-time.After(t.timeOut):
// Timeout has expired. Remove the mergeWalk from mergeWalkPool and
// end the mergeWalk go-routine.
t.Lock()
defer t.Unlock()
walks, ok := t.pool[params]
if ok {
// Trick of filtering without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
nwalks := walks[:0]
// Look for walkInfo, remove it from the walks list.
for _, walk := range walks {
if !reflect.DeepEqual(walk, walkInfo) {
nwalks = append(nwalks, walk)
}
}
if len(nwalks) == 0 {
// No more mergeWalk go-routines associated with listParams
// hence remove map entry.
delete(t.pool, params)
} else {
// There are more mergeWalk go-routines associated with listParams
// hence save the list in the map.
t.pool[params] = nwalks
}
}
// Signal the mergeWalk go-routine to die.
close(endWalkCh)
case <-endTimerCh:
return
}
}(endTimerCh, walkInfo)
}

View file

@ -1,185 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2019,2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"testing"
"time"
)
// Test if tree walker go-routine is removed from the pool after timeout
// and that is available in the pool before the timeout.
func TestMergeWalkPoolVersionsBasic(t *testing.T) {
// Create a treeWalkPool
tw := NewMergeWalkVersionsPool(1 * time.Second)
// Create sample params
params := listParams{
bucket: "test-bucket",
}
endWalkCh := make(chan struct{})
// Add a treeWalk to the pool
tw.Set(params, []FileInfoVersionsCh{}, endWalkCh)
// Wait for treeWalkPool timeout to happen
<-time.After(2 * time.Second)
if c1, _ := tw.Release(params); c1 != nil {
t.Error("treeWalk go-routine must have been freed")
}
// Add the treeWalk back to the pool
endWalkCh = make(chan struct{})
tw.Set(params, []FileInfoVersionsCh{}, endWalkCh)
// Release the treeWalk before timeout
select {
case <-time.After(1 * time.Second):
break
default:
if c1, _ := tw.Release(params); c1 == nil {
t.Error("treeWalk go-routine got freed before timeout")
}
}
}
// Test if tree walker go-routine is removed from the pool after timeout
// and that is available in the pool before the timeout.
func TestMergeWalkPoolBasic(t *testing.T) {
// Create a treeWalkPool
tw := NewMergeWalkPool(1 * time.Second)
// Create sample params
params := listParams{
bucket: "test-bucket",
}
endWalkCh := make(chan struct{})
// Add a treeWalk to the pool
tw.Set(params, []FileInfoCh{}, endWalkCh)
// Wait for treeWalkPool timeout to happen
<-time.After(2 * time.Second)
if c1, _ := tw.Release(params); c1 != nil {
t.Error("treeWalk go-routine must have been freed")
}
// Add the treeWalk back to the pool
endWalkCh = make(chan struct{})
tw.Set(params, []FileInfoCh{}, endWalkCh)
// Release the treeWalk before timeout
select {
case <-time.After(1 * time.Second):
break
default:
if c1, _ := tw.Release(params); c1 == nil {
t.Error("treeWalk go-routine got freed before timeout")
}
}
}
// Test if multiple merge walkers for the same listParams are managed as expected by the pool.
func TestManyMergeWalksSameParam(t *testing.T) {
// Create a treeWalkPool.
tw := NewMergeWalkPool(5 * time.Second)
// Create sample params.
params := listParams{
bucket: "test-bucket",
}
select {
// This timeout is an upper-bound. This is started
// before the first treeWalk go-routine's timeout period starts.
case <-time.After(5 * time.Second):
break
default:
// Create many treeWalk go-routines for the same params.
for i := 0; i < treeWalkSameEntryLimit; i++ {
endWalkCh := make(chan struct{})
walkChs := make([]FileInfoCh, 0)
tw.Set(params, walkChs, endWalkCh)
}
tw.Lock()
if walks, ok := tw.pool[params]; ok {
if len(walks) != treeWalkSameEntryLimit {
t.Error("There aren't as many walks as were Set")
}
}
tw.Unlock()
for i := 0; i < treeWalkSameEntryLimit; i++ {
tw.Lock()
if walks, ok := tw.pool[params]; ok {
// Before ith Release we should have n-i treeWalk go-routines.
if treeWalkSameEntryLimit-i != len(walks) {
t.Error("There aren't as many walks as were Set")
}
}
tw.Unlock()
tw.Release(params)
}
}
}
// Test if multiple merge walkers for the same listParams are managed as expected by the pool
// but that treeWalkSameEntryLimit is respected.
func TestManyMergeWalksSameParamPrune(t *testing.T) {
// Create a treeWalkPool.
tw := NewMergeWalkPool(5 * time.Second)
// Create sample params.
params := listParams{
bucket: "test-bucket",
}
select {
// This timeout is an upper-bound. This is started
// before the first treeWalk go-routine's timeout period starts.
case <-time.After(5 * time.Second):
break
default:
// Create many treeWalk go-routines for the same params.
for i := 0; i < treeWalkSameEntryLimit*4; i++ {
endWalkCh := make(chan struct{})
walkChs := make([]FileInfoCh, 0)
tw.Set(params, walkChs, endWalkCh)
}
tw.Lock()
if walks, ok := tw.pool[params]; ok {
if len(walks) > treeWalkSameEntryLimit {
t.Error("There aren't as many walks as were Set")
}
}
tw.Unlock()
for i := 0; i < treeWalkSameEntryLimit; i++ {
tw.Lock()
if walks, ok := tw.pool[params]; ok {
// Before ith Release we should have n-i treeWalk go-routines.
if treeWalkSameEntryLimit-i != len(walks) {
t.Error("There aren't as many walks as were Set")
}
}
tw.Unlock()
tw.Release(params)
}
}
}

View file

@ -150,13 +150,6 @@ func (d *naughtyDisk) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Wr
return d.disk.WalkDir(ctx, opts, wr)
}
func (d *naughtyDisk) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := d.calcError(); err != nil {
return nil, err
}
return d.disk.WalkSplunk(ctx, volume, dirPath, marker, endWalkCh)
}
func (d *naughtyDisk) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
if err := d.calcError(); err != nil {
return nil, err
@ -164,13 +157,6 @@ func (d *naughtyDisk) WalkVersions(ctx context.Context, volume, dirPath, marker
return d.disk.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh)
}
func (d *naughtyDisk) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := d.calcError(); err != nil {
return nil, err
}
return d.disk.Walk(ctx, volume, dirPath, marker, recursive, endWalkCh)
}
func (d *naughtyDisk) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
if err := d.calcError(); err != nil {
return []string{}, err

View file

@ -22,6 +22,24 @@ import (
//go:generate msgp -file=$GOFILE
// DiskInfo is an extended type which returns current
// disk usage per path.
type DiskInfo struct {
Total uint64
Free uint64
Used uint64
FSType string
RootDisk bool
Healing bool
Endpoint string
MountPath string
ID string
Error string // carries the error over the network
}
// VolsInfo is a collection of volume(bucket) information
type VolsInfo []VolInfo
// VolInfo - represents volume stat information.
type VolInfo struct {
// Name of the volume.

View file

@ -6,6 +6,334 @@ import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Total, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "Total")
return
}
case "Free":
z.Free, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "Free")
return
}
case "Used":
z.Used, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "Used")
return
}
case "FSType":
z.FSType, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "FSType")
return
}
case "RootDisk":
z.RootDisk, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "RootDisk")
return
}
case "Healing":
z.Healing, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "Healing")
return
}
case "Endpoint":
z.Endpoint, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Endpoint")
return
}
case "MountPath":
z.MountPath, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "MountPath")
return
}
case "ID":
z.ID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
case "Error":
z.Error, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 10
// write "Total"
err = en.Append(0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
err = en.WriteUint64(z.Total)
if err != nil {
err = msgp.WrapError(err, "Total")
return
}
// write "Free"
err = en.Append(0xa4, 0x46, 0x72, 0x65, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.Free)
if err != nil {
err = msgp.WrapError(err, "Free")
return
}
// write "Used"
err = en.Append(0xa4, 0x55, 0x73, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.Used)
if err != nil {
err = msgp.WrapError(err, "Used")
return
}
// write "FSType"
err = en.Append(0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65)
if err != nil {
return
}
err = en.WriteString(z.FSType)
if err != nil {
err = msgp.WrapError(err, "FSType")
return
}
// write "RootDisk"
err = en.Append(0xa8, 0x52, 0x6f, 0x6f, 0x74, 0x44, 0x69, 0x73, 0x6b)
if err != nil {
return
}
err = en.WriteBool(z.RootDisk)
if err != nil {
err = msgp.WrapError(err, "RootDisk")
return
}
// write "Healing"
err = en.Append(0xa7, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67)
if err != nil {
return
}
err = en.WriteBool(z.Healing)
if err != nil {
err = msgp.WrapError(err, "Healing")
return
}
// write "Endpoint"
err = en.Append(0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteString(z.Endpoint)
if err != nil {
err = msgp.WrapError(err, "Endpoint")
return
}
// write "MountPath"
err = en.Append(0xa9, 0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68)
if err != nil {
return
}
err = en.WriteString(z.MountPath)
if err != nil {
err = msgp.WrapError(err, "MountPath")
return
}
// write "ID"
err = en.Append(0xa2, 0x49, 0x44)
if err != nil {
return
}
err = en.WriteString(z.ID)
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
// write "Error"
err = en.Append(0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72)
if err != nil {
return
}
err = en.WriteString(z.Error)
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 10
// string "Total"
o = append(o, 0x8a, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendUint64(o, z.Total)
// string "Free"
o = append(o, 0xa4, 0x46, 0x72, 0x65, 0x65)
o = msgp.AppendUint64(o, z.Free)
// string "Used"
o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x64)
o = msgp.AppendUint64(o, z.Used)
// string "FSType"
o = append(o, 0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65)
o = msgp.AppendString(o, z.FSType)
// string "RootDisk"
o = append(o, 0xa8, 0x52, 0x6f, 0x6f, 0x74, 0x44, 0x69, 0x73, 0x6b)
o = msgp.AppendBool(o, z.RootDisk)
// string "Healing"
o = append(o, 0xa7, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67)
o = msgp.AppendBool(o, z.Healing)
// string "Endpoint"
o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74)
o = msgp.AppendString(o, z.Endpoint)
// string "MountPath"
o = append(o, 0xa9, 0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68)
o = msgp.AppendString(o, z.MountPath)
// string "ID"
o = append(o, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID)
// string "Error"
o = append(o, 0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72)
o = msgp.AppendString(o, z.Error)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Total, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Total")
return
}
case "Free":
z.Free, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Free")
return
}
case "Used":
z.Used, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Used")
return
}
case "FSType":
z.FSType, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "FSType")
return
}
case "RootDisk":
z.RootDisk, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "RootDisk")
return
}
case "Healing":
z.Healing, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Healing")
return
}
case "Endpoint":
z.Endpoint, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Endpoint")
return
}
case "MountPath":
z.MountPath, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "MountPath")
return
}
case "ID":
z.ID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ID")
return
}
case "Error":
z.Error, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *DiskInfo) Msgsize() (s int) {
s = 1 + 6 + msgp.Uint64Size + 5 + msgp.Uint64Size + 5 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.FSType) + 9 + msgp.BoolSize + 8 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 10 + msgp.StringPrefixSize + len(z.MountPath) + 3 + msgp.StringPrefixSize + len(z.ID) + 6 + msgp.StringPrefixSize + len(z.Error)
return
}
// DecodeMsg implements msgp.Decodable
func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
@ -1080,3 +1408,170 @@ func (z VolInfo) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize
return
}
// DecodeMsg implements msgp.Decodable
func (z *VolsInfo) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if cap((*z)) >= int(zb0002) {
(*z) = (*z)[:zb0002]
} else {
(*z) = make(VolsInfo, zb0002)
}
for zb0001 := range *z {
var field []byte
_ = field
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
for zb0003 > 0 {
zb0003--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
switch msgp.UnsafeString(field) {
case "Name":
(*z)[zb0001].Name, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, zb0001, "Name")
return
}
case "Created":
(*z)[zb0001].Created, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, zb0001, "Created")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z VolsInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(uint32(len(z)))
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0004 := range z {
// map header, size 2
// write "Name"
err = en.Append(0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
if err != nil {
return
}
err = en.WriteString(z[zb0004].Name)
if err != nil {
err = msgp.WrapError(err, zb0004, "Name")
return
}
// write "Created"
err = en.Append(0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteTime(z[zb0004].Created)
if err != nil {
err = msgp.WrapError(err, zb0004, "Created")
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z VolsInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendArrayHeader(o, uint32(len(z)))
for zb0004 := range z {
// map header, size 2
// string "Name"
o = append(o, 0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, z[zb0004].Name)
// string "Created"
o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64)
o = msgp.AppendTime(o, z[zb0004].Created)
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *VolsInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if cap((*z)) >= int(zb0002) {
(*z) = (*z)[:zb0002]
} else {
(*z) = make(VolsInfo, zb0002)
}
for zb0001 := range *z {
var field []byte
_ = field
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
for zb0003 > 0 {
zb0003--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
switch msgp.UnsafeString(field) {
case "Name":
(*z)[zb0001].Name, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001, "Name")
return
}
case "Created":
(*z)[zb0001].Created, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001, "Created")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z VolsInfo) Msgsize() (s int) {
s = msgp.ArrayHeaderSize
for zb0004 := range z {
s += 1 + 5 + msgp.StringPrefixSize + len(z[zb0004].Name) + 8 + msgp.TimeSize
}
return
}

View file

@ -9,6 +9,119 @@ import (
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalDiskInfo(t *testing.T) {
v := DiskInfo{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgDiskInfo(b *testing.B) {
v := DiskInfo{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgDiskInfo(b *testing.B) {
v := DiskInfo{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalDiskInfo(b *testing.B) {
v := DiskInfo{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeDiskInfo(t *testing.T) {
v := DiskInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeDiskInfo Msgsize() is inaccurate")
}
vn := DiskInfo{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeDiskInfo(b *testing.B) {
v := DiskInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeDiskInfo(b *testing.B) {
v := DiskInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalFileInfo(t *testing.T) {
v := FileInfo{}
bts, err := v.MarshalMsg(nil)
@ -573,3 +686,116 @@ func BenchmarkDecodeVolInfo(b *testing.B) {
}
}
}
func TestMarshalUnmarshalVolsInfo(t *testing.T) {
v := VolsInfo{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgVolsInfo(b *testing.B) {
v := VolsInfo{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgVolsInfo(b *testing.B) {
v := VolsInfo{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalVolsInfo(b *testing.B) {
v := VolsInfo{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeVolsInfo(t *testing.T) {
v := VolsInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeVolsInfo Msgsize() is inaccurate")
}
vn := VolsInfo{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeVolsInfo(b *testing.B) {
v := VolsInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeVolsInfo(b *testing.B) {
v := VolsInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View file

@ -1,3 +1,19 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
@ -9,6 +25,116 @@ import (
"github.com/tinylib/msgp/msgp"
)
func BenchmarkDecodeDiskInfoMsgp(b *testing.B) {
v := DiskInfo{
Total: 1000,
Free: 1000,
Used: 1000,
FSType: "xfs",
RootDisk: true,
Healing: true,
Endpoint: "http://localhost:9001/tmp/disk1",
MountPath: "/tmp/disk1",
ID: "uuid",
Error: "",
}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(1)
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.Log("Size:", buf.Len(), "bytes")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkDecodeDiskInfoGOB(b *testing.B) {
v := DiskInfo{
Total: 1000,
Free: 1000,
Used: 1000,
FSType: "xfs",
RootDisk: true,
Healing: true,
Endpoint: "http://localhost:9001/tmp/disk1",
MountPath: "/tmp/disk1",
ID: "uuid",
Error: "",
}
var buf bytes.Buffer
gob.NewEncoder(&buf).Encode(v)
encoded := buf.Bytes()
b.Log("Size:", buf.Len(), "bytes")
b.SetBytes(1)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
dec := gob.NewDecoder(bytes.NewBuffer(encoded))
err := dec.Decode(&v)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkEncodeDiskInfoMsgp(b *testing.B) {
v := DiskInfo{
Total: 1000,
Free: 1000,
Used: 1000,
FSType: "xfs",
RootDisk: true,
Healing: true,
Endpoint: "http://localhost:9001/tmp/disk1",
MountPath: "/tmp/disk1",
ID: "uuid",
Error: "",
}
b.SetBytes(1)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := msgp.Encode(ioutil.Discard, &v)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkEncodeDiskInfoGOB(b *testing.B) {
v := DiskInfo{
Total: 1000,
Free: 1000,
Used: 1000,
FSType: "xfs",
RootDisk: true,
Healing: true,
Endpoint: "http://localhost:9001/tmp/disk1",
MountPath: "/tmp/disk1",
ID: "uuid",
Error: "",
}
enc := gob.NewEncoder(ioutil.Discard)
b.SetBytes(1)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := enc.Encode(&v)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkDecodeFileInfoMsgp(b *testing.B) {
v := FileInfo{Volume: "testbucket", Name: "src/compress/zlib/reader_test.go", VersionID: "", IsLatest: true, Deleted: false, DataDir: "5e0153cc-621a-4267-8cb6-4919140d53b3", XLV1: false, ModTime: UTCNow(), Size: 3430, Mode: 0x0, Metadata: map[string]string{"X-Minio-Internal-Server-Side-Encryption-Iv": "jIJPsrkkVYYMvc7edBrNl+7zcM7+ZwXqMb/YAjBO/ck=", "X-Minio-Internal-Server-Side-Encryption-S3-Kms-Key-Id": "my-minio-key", "X-Minio-Internal-Server-Side-Encryption-S3-Kms-Sealed-Key": "IAAfAP2p7ZLv3UpLwBnsKkF2mtWba0qoY42tymK0szRgGvAxBNcXyHXYooe9dQpeeEJWgKUa/8R61oCy1mFwIg==", "X-Minio-Internal-Server-Side-Encryption-S3-Sealed-Key": "IAAfAPFYRDkHVirJBJxBixNj3PLWt78dFuUTyTLIdLG820J7XqLPBO4gpEEEWw/DoTsJIb+apnaem+rKtQ1h3Q==", "X-Minio-Internal-Server-Side-Encryption-Seal-Algorithm": "DAREv2-HMAC-SHA256", "content-type": "application/octet-stream", "etag": "20000f00e2c3709dc94905c6ce31e1cadbd1c064e14acdcd44cf0ac2db777eeedd88d639fcd64de16851ade8b21a9a1a"}, Parts: []ObjectPartInfo{{ETag: "", Number: 1, Size: 3430, ActualSize: 3398}}, Erasure: ErasureInfo{Algorithm: "reedsolomon", DataBlocks: 2, ParityBlocks: 2, BlockSize: 10485760, Index: 3, Distribution: []int{3, 4, 1, 2}, Checksums: []ChecksumInfo{{PartNumber: 1, Algorithm: 0x3, Hash: []uint8{}}}}}
var buf bytes.Buffer

View file

@ -53,10 +53,6 @@ type StorageAPI interface {
// WalkVersions in sorted order directly on disk.
WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error)
// Walk in sorted order directly on disk.
Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error)
// Walk in sorted order directly on disk.
WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error)
// Metadata operations
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error

View file

@ -207,7 +207,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
return info, err
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
err = msgp.Decode(respBody, &info)
if err != nil {
return info, err
}
@ -247,8 +247,9 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&vols)
return vols, err
vinfos := VolsInfo(vols)
err = msgp.Decode(respBody, &vinfos)
return vinfos, err
}
// StatVol - get volume info over the network.
@ -260,7 +261,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&vol)
err = msgp.Decode(respBody, &vol)
return vol, err
}
@ -444,40 +445,6 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
return int64(n), err
}
func (client *storageRESTClient) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTDirPath, dirPath)
values.Set(storageRESTMarkerPath, marker)
respBody, err := client.call(ctx, storageRESTMethodWalkSplunk, values, nil, -1)
if err != nil {
return nil, err
}
ch := make(chan FileInfo)
go func() {
defer close(ch)
defer http.DrainBody(respBody)
decoder := msgp.NewReader(respBody)
for {
var fi FileInfo
if gerr := fi.DecodeMsg(decoder); gerr != nil {
// Upon error return
return
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
@ -515,41 +482,6 @@ func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPa
return ch, nil
}
func (client *storageRESTClient) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTDirPath, dirPath)
values.Set(storageRESTMarkerPath, marker)
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
respBody, err := client.call(ctx, storageRESTMethodWalk, values, nil, -1)
if err != nil {
return nil, err
}
ch := make(chan FileInfo)
go func() {
defer close(ch)
defer http.DrainBody(respBody)
decoder := msgp.NewReader(respBody)
for {
var fi FileInfo
if gerr := fi.DecodeMsg(decoder); gerr != nil {
// Upon error return
return
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
// ListDir - lists a directory.
func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
values := make(url.Values)

View file

@ -45,9 +45,7 @@ const (
storageRESTMethodReadFile = "/readfile"
storageRESTMethodReadFileStream = "/readfilestream"
storageRESTMethodListDir = "/listdir"
storageRESTMethodWalk = "/walk"
storageRESTMethodWalkVersions = "/walkversions"
storageRESTMethodWalkSplunk = "/walksplunk"
storageRESTMethodDeleteFile = "/deletefile"
storageRESTMethodDeleteVersions = "/deleteverions"
storageRESTMethodRenameFile = "/renamefile"

View file

@ -154,7 +154,7 @@ func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Reque
info.Error = err.Error()
}
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(info)
logger.LogIf(r.Context(), msgp.Encode(w, &info))
}
func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r *http.Request) {
@ -219,8 +219,8 @@ func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Reque
s.writeErrorResponse(w, err)
return
}
gob.NewEncoder(w).Encode(&infos)
w.(http.Flusher).Flush()
defer w.(http.Flusher).Flush()
logger.LogIf(r.Context(), msgp.Encode(w, VolsInfo(infos)))
}
// StatVolHandler - stat a volume.
@ -235,8 +235,8 @@ func (s *storageRESTServer) StatVolHandler(w http.ResponseWriter, r *http.Reques
s.writeErrorResponse(w, err)
return
}
gob.NewEncoder(w).Encode(info)
w.(http.Flusher).Flush()
defer w.(http.Flusher).Flush()
logger.LogIf(r.Context(), msgp.Encode(w, &info))
}
// DeleteVolumeHandler - delete a volume.
@ -529,30 +529,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
w.(http.Flusher).Flush()
}
// WalkHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
dirPath := vars[storageRESTDirPath]
markerPath := vars[storageRESTMarkerPath]
setEventStreamHeaders(w)
fch, err := s.storage.WalkSplunk(r.Context(), volume, dirPath, markerPath, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
}
encoder := msgp.NewWriter(w)
for fi := range fch {
logger.LogIf(r.Context(), fi.EncodeMsg(encoder))
}
logger.LogIf(r.Context(), encoder.Flush())
}
// WalkVersionsHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -582,35 +558,6 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R
logger.LogIf(r.Context(), encoder.Flush())
}
// WalkHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
dirPath := vars[storageRESTDirPath]
markerPath := vars[storageRESTMarkerPath]
recursive, err := strconv.ParseBool(vars[storageRESTRecursive])
if err != nil {
s.writeErrorResponse(w, err)
return
}
setEventStreamHeaders(w)
fch, err := s.storage.Walk(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
}
encoder := msgp.NewWriter(w)
for fi := range fch {
logger.LogIf(r.Context(), fi.EncodeMsg(encoder))
}
logger.LogIf(r.Context(), encoder.Flush())
}
// ListDirHandler - list a directory.
func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -1119,10 +1066,6 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerSets Endpoint
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkSplunk).HandlerFunc(httpTraceHdrs(server.WalkSplunkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkVersions).HandlerFunc(httpTraceHdrs(server.WalkVersionsHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...)

View file

@ -149,22 +149,6 @@ func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath
return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh)
}
func (p *xlStorageDiskIDCheck) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.Walk(ctx, volume, dirPath, marker, recursive, endWalkCh)
}
func (p *xlStorageDiskIDCheck) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.WalkSplunk(ctx, volume, dirPath, marker, endWalkCh)
}
func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) {
if err := p.checkDiskStale(); err != nil {
return nil, err

View file

@ -446,21 +446,6 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
return dataUsageInfo, nil
}
// DiskInfo is an extended type which returns current
// disk usage per path.
type DiskInfo struct {
Total uint64
Free uint64
Used uint64
FSType string
RootDisk bool
Healing bool
Endpoint string
MountPath string
ID string
Error string // carries the error over the network
}
// DiskInfo provides current information about disk space usage,
// total free inodes and underlying filesystem.
func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
@ -853,79 +838,6 @@ func (s *xlStorage) isLeafDir(volume, leafPath string) bool {
return isDirEmpty(pathJoin(volumeDir, leafPath))
}
// WalkSplunk - is a sorted walker which returns file entries in lexically
// sorted order, additionally along with metadata about each of those entries.
// Implemented specifically for Splunk backend structure and List call with
// delimiter as "guidSplunk"
func (s *xlStorage) WalkSplunk(ctx context.Context, volume, dirPath, marker string, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
ch = make(chan FileInfo)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
entries, err := s.ListDirSplunk(volume, dirPath, -1)
if err != nil {
return false, nil, false
}
if len(entries) == 0 {
return true, nil, false
}
entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeafSplunk)
return false, entries, delayIsLeaf
}
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, true, listDir, s.isLeafSplunk, s.isLeafDir, endWalkCh)
for walkResult := range walkResultCh {
var fi FileInfo
if HasSuffix(walkResult.entry, SlashSeparator) {
fi = FileInfo{
Volume: volume,
Name: walkResult.entry,
Mode: uint32(os.ModeDir),
}
} else {
var err error
var xlMetaBuf []byte
xlMetaBuf, err = ioutil.ReadFile(pathJoin(volumeDir, walkResult.entry, xlStorageFormatFile))
if err != nil {
continue
}
fi, err = getFileInfo(xlMetaBuf, volume, walkResult.entry, "")
if err != nil {
continue
}
if fi.Deleted {
// Ignore delete markers.
continue
}
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
// WalkVersions - is a sorted walker which returns file entries in lexically sorted order,
// additionally along with metadata version info about each of those entries.
func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) {
@ -1011,90 +923,6 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st
return ch, nil
}
// Walk - is a sorted walker which returns file entries in lexically
// sorted order, additionally along with metadata about each of those entries.
func (s *xlStorage) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) {
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
}()
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
// Fast exit track to check if we are listing an object with
// a trailing slash, this will avoid to list the object content.
if HasSuffix(dirPath, SlashSeparator) {
if st, err := os.Stat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() {
return nil, errFileNotFound
}
}
ch = make(chan FileInfo)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
entries, err := s.ListDir(ctx, volume, dirPath, -1)
if err != nil {
return false, nil, false
}
if len(entries) == 0 {
return true, nil, false
}
entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeaf)
return false, entries, delayIsLeaf
}
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh)
for walkResult := range walkResultCh {
var fi FileInfo
if HasSuffix(walkResult.entry, SlashSeparator) {
fi = FileInfo{
Volume: volume,
Name: walkResult.entry,
Mode: uint32(os.ModeDir),
}
} else {
var err error
var xlMetaBuf []byte
xlMetaBuf, err = ioutil.ReadFile(pathJoin(volumeDir, walkResult.entry, xlStorageFormatFile))
if err != nil {
continue
}
fi, err = getFileInfo(xlMetaBuf, volume, walkResult.entry, "")
if err != nil {
continue
}
if fi.Deleted {
// Ignore delete markers.
continue
}
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing SlashSeparator.
func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {