Server side speedtest implementation (#12750)

This commit is contained in:
Krishna Srinivas 2021-07-27 12:55:56 -07:00 committed by GitHub
parent 471b4fd0c9
commit aa0c28809b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 297 additions and 4 deletions

View file

@ -39,6 +39,7 @@ import (
"strings"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux"
"github.com/klauspost/compress/zip"
"github.com/minio/kes"
@ -910,6 +911,52 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *
}
}
func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "SpeedtestHandler")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction)
if objectAPI == nil {
return
}
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
sizeStr := r.URL.Query().Get(peerRESTSize)
durationStr := r.URL.Query().Get(peerRESTDuration)
concurrentStr := r.URL.Query().Get(peerRESTConcurrent)
size, err := strconv.Atoi(sizeStr)
if err != nil {
size = 64 * humanize.MiByte
}
concurrent, err := strconv.Atoi(concurrentStr)
if err != nil {
concurrent = 32
}
duration, err := time.ParseDuration(durationStr)
if err != nil {
duration = time.Second * 10
}
results := globalNotificationSys.Speedtest(ctx, size, concurrent, duration)
if err := json.NewEncoder(w).Encode(results); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
objectAPI.DeleteBucket(ctx, pathJoin(minioMetaSpeedTestBucket, minioMetaSpeedTestBucketPrefix), true)
w.(http.Flusher).Flush()
}
func validateAdminReq(ctx context.Context, w http.ResponseWriter, r *http.Request, action iampolicy.AdminAction) (ObjectLayer, auth.Credentials) {
var cred auth.Credentials
var adminAPIErr APIErrorCode

View file

@ -204,6 +204,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
Queries("paths", "{paths:.*}").HandlerFunc(gz(httpTraceHdrs(adminAPI.ForceUnlockHandler)))
}
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest").HandlerFunc(httpTraceHdrs(adminAPI.SpeedtestHandler))
// HTTP Trace
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/trace").HandlerFunc(gz(adminAPI.TraceHandler))

View file

@ -1427,3 +1427,42 @@ func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) chan Metric {
}(&wg, ch)
return ch
}
// Speedtest run GET/PUT tests at input concurrency for requested object size,
// optionally you can extend the tests longer with time.Duration.
func (sys *NotificationSys) Speedtest(ctx context.Context, size int, concurrent int, duration time.Duration) []madmin.SpeedtestResult {
results := make([]madmin.SpeedtestResult, len(sys.peerClients)+1)
var wg sync.WaitGroup
for index := range sys.peerClients {
if sys.peerClients[index] == nil {
continue
}
wg.Add(1)
go func(index int) {
defer wg.Done()
r, err := sys.peerClients[index].Speedtest(ctx, size, concurrent, duration)
results[index].Endpoint = sys.peerClients[index].String()
results[index].Err = err
if err == nil {
results[index].Uploads = r.Uploads
results[index].Downloads = r.Downloads
}
}(index)
}
wg.Add(1)
go func() {
defer wg.Done()
r, err := selfSpeedtest(ctx, size, concurrent, duration)
results[len(results)-1].Endpoint = globalMinioEndpoint
results[len(results)-1].Err = err
if err == nil {
results[len(results)-1].Uploads = r.Uploads
results[len(results)-1].Downloads = r.Downloads
}
}()
wg.Wait()
return results
}

View file

@ -61,7 +61,9 @@ const (
// MinIO tmp meta prefix.
minioMetaTmpBucket = minioMetaBucket + "/tmp"
// MinIO tmp meta prefix for deleted objects.
minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash"
minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash"
minioMetaSpeedTestBucket = minioMetaBucket + "/speedtest"
minioMetaSpeedTestBucketPrefix = "objects/"
// DNS separator (period), used for bucket name validation.
dnsDelimiter = "."

View file

@ -969,3 +969,21 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric
}(ch)
return ch, nil
}
func (client *peerRESTClient) Speedtest(ctx context.Context, size, concurrent int, duration time.Duration) (SpeedtestResult, error) {
values := make(url.Values)
values.Set(peerRESTSize, strconv.Itoa(size))
values.Set(peerRESTConcurrent, strconv.Itoa(concurrent))
values.Set(peerRESTDuration, duration.String())
respBody, err := client.callWithContext(context.Background(), peerRESTMethodSpeedtest, values, nil, -1)
if err != nil {
return SpeedtestResult{}, err
}
defer http.DrainBody(respBody)
dec := gob.NewDecoder(respBody)
var result SpeedtestResult
err = dec.Decode(&result)
return result, err
}

View file

@ -62,6 +62,7 @@ const (
peerRESTMethodUpdateMetacacheListing = "/updatemetacache"
peerRESTMethodGetPeerMetrics = "/peermetrics"
peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig"
peerRESTMethodSpeedtest = "/speedtest"
)
const (
@ -81,6 +82,9 @@ const (
peerRESTTraceS3 = "s3"
peerRESTTraceOS = "os"
peerRESTTraceThreshold = "threshold"
peerRESTSize = "size"
peerRESTConcurrent = "concurrent"
peerRESTDuration = "duration"
peerRESTListenBucket = "bucket"
peerRESTListenPrefix = "prefix"

View file

@ -19,6 +19,7 @@ package cmd
import (
"context"
"crypto/rand"
"encoding/gob"
"errors"
"fmt"
@ -27,12 +28,17 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/minio/madmin-go"
b "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/tinylib/msgp/msgp"
)
@ -1097,6 +1103,179 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request)
w.(http.Flusher).Flush()
}
// SpeedtestResult return value of the speedtest function
type SpeedtestResult struct {
Uploads uint64
Downloads uint64
}
// SpeedtestObject implements "random-read" object reader
type SpeedtestObject struct {
buf []byte
remaining int
}
func (bo *SpeedtestObject) Read(b []byte) (int, error) {
if bo.remaining == 0 {
return 0, io.EOF
}
if len(b) == 0 {
return 0, nil
}
if len(b) > len(bo.buf) {
b = b[:len(bo.buf)]
}
if len(b) > bo.remaining {
b = b[:bo.remaining]
}
copy(b, bo.buf)
bo.remaining -= len(b)
return len(b), nil
}
// Runs the speedtest on local MinIO process.
func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration) (SpeedtestResult, error) {
var result SpeedtestResult
objAPI := newObjectLayerFn()
if objAPI == nil {
return result, errServerNotInitialized
}
bucket := minioMetaSpeedTestBucket
buf := make([]byte, humanize.MiByte)
rand.Read(buf)
objCountPerThread := make([]uint64, concurrent)
var objUploadCount uint64
var objDownloadCount uint64
var wg sync.WaitGroup
doneCh1 := make(chan struct{})
go func() {
time.Sleep(duration)
close(doneCh1)
}()
objNamePrefix := minioMetaSpeedTestBucketPrefix + uuid.New().String()
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func(i int) {
defer wg.Done()
for {
hashReader, err := hash.NewReader(&SpeedtestObject{buf, size},
int64(size), "", "", int64(size))
if err != nil {
logger.LogIf(ctx, err)
break
}
reader := NewPutObjReader(hashReader)
_, err = objAPI.PutObject(ctx, bucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, objCountPerThread[i]), reader, ObjectOptions{})
if err != nil {
logger.LogIf(ctx, err)
break
}
objCountPerThread[i]++
atomic.AddUint64(&objUploadCount, 1)
select {
case <-doneCh1:
return
default:
}
}
}(i)
}
wg.Wait()
doneCh2 := make(chan struct{})
go func() {
time.Sleep(duration)
close(doneCh2)
}()
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func(i int) {
defer wg.Done()
var j uint64
for {
if objCountPerThread[i] == j {
j = 0
}
r, err := objAPI.GetObjectNInfo(ctx, bucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{})
if err != nil {
logger.LogIf(ctx, err)
break
}
_, err = io.Copy(ioutil.Discard, r)
r.Close()
if err != nil {
logger.LogIf(ctx, err)
break
}
j++
atomic.AddUint64(&objDownloadCount, 1)
select {
case <-doneCh2:
return
default:
}
}
}(i)
}
wg.Wait()
return SpeedtestResult{objUploadCount, objDownloadCount}, nil
}
func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
}
objAPI := newObjectLayerFn()
if objAPI == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
sizeStr := r.URL.Query().Get(peerRESTSize)
durationStr := r.URL.Query().Get(peerRESTDuration)
concurrentStr := r.URL.Query().Get(peerRESTConcurrent)
size, err := strconv.Atoi(sizeStr)
if err != nil {
size = 64 * humanize.MiByte
}
concurrent, err := strconv.Atoi(concurrentStr)
if err != nil {
concurrent = 32
}
duration, err := time.ParseDuration(durationStr)
if err != nil {
duration = time.Second * 10
}
result, err := selfSpeedtest(r.Context(), size, concurrent, duration)
if err != nil {
s.writeErrorResponse(w, err)
return
}
enc := gob.NewEncoder(w)
if err := enc.Encode(result); err != nil {
s.writeErrorResponse(w, errors.New("Encoding report failed: "+err.Error()))
return
}
w.(http.Flusher).Flush()
}
// registerPeerRESTHandlers - register peer rest router.
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
@ -1139,4 +1318,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodUpdateMetacacheListing).HandlerFunc(httpTraceHdrs(server.UpdateMetacacheListingHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerMetrics).HandlerFunc(httpTraceHdrs(server.GetPeerMetrics))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
}

View file

@ -292,7 +292,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
}
// Create all necessary bucket folders if possible.
if err = p.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket); err != nil {
if err = p.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket, minioMetaSpeedTestBucket); err != nil {
return nil, err
}

2
go.mod
View file

@ -45,7 +45,7 @@ require (
github.com/minio/csvparser v1.0.0
github.com/minio/highwayhash v1.0.2
github.com/minio/kes v0.14.0
github.com/minio/madmin-go v1.0.17
github.com/minio/madmin-go v1.0.19
github.com/minio/minio-go/v7 v7.0.13-0.20210715203016-9e713532886e
github.com/minio/parquet-go v1.0.0
github.com/minio/pkg v1.0.10

3
go.sum
View file

@ -1023,8 +1023,9 @@ github.com/minio/kes v0.14.0 h1:plCGm4LwR++T1P1sXsJbyFRX54CE1WRuo9PAPj6MC3Q=
github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQcfA=
github.com/minio/madmin-go v1.0.6/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs=
github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs=
github.com/minio/madmin-go v1.0.17 h1:VMEn4nMKf0X3uNH0u+fZcn17KSwVkQGwyER/igG556E=
github.com/minio/madmin-go v1.0.17/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA=
github.com/minio/madmin-go v1.0.19 h1:XAp2rpo9OwzKAHIq5+EkAt148+lIeFyeo7cgVLNCWC8=
github.com/minio/madmin-go v1.0.19/go.mod h1:4nl9hvLWFnwCjkLfZSsZXEHgDODa2XSG6xGlIZyQ2oA=
github.com/minio/mc v0.0.0-20210626002108-cebf3318546f h1:hyFvo5hSFw2K417YvDr/vAKlgCG69uTuhZW/5LNdL0U=
github.com/minio/mc v0.0.0-20210626002108-cebf3318546f/go.mod h1:tuaonkPjVApCXkbtKENHBtsqUf7YTV33qmFrC+Pgp5g=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=