From 5151c429e434d8527361b36a6143d8e0293af6ec Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 28 Jan 2021 22:15:38 -0800 Subject: [PATCH] fix: add api level throttler for LIST calls --- cmd/admin-router.go | 3 +- cmd/api-router.go | 33 ++++++++++++ cmd/background-heal-ops.go | 2 +- cmd/bucket-listobjects-handlers.go | 83 ++++++++++++++++++++++++++++-- cmd/config/api/api.go | 1 + cmd/handler-api.go | 16 +++--- 6 files changed, 124 insertions(+), 14 deletions(-) diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 9d1c34818..34ec44cda 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -36,7 +36,7 @@ const ( // adminAPIHandlers provides HTTP handlers for MinIO admin API. type adminAPIHandlers struct { - mu sync.Mutex + mu *sync.Mutex healSetsMap map[string]healInitSetParams } @@ -44,6 +44,7 @@ type adminAPIHandlers struct { func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) { adminAPI := adminAPIHandlers{ + mu: &sync.Mutex{}, healSetsMap: make(map[string]healInitSetParams), } diff --git a/cmd/api-router.go b/cmd/api-router.go index f69945dc9..e2fe1c420 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -19,9 +19,14 @@ package cmd import ( "net" "net/http" + "strconv" + "strings" + "time" "github.com/gorilla/mux" + "github.com/minio/minio/cmd/config/api" xhttp "github.com/minio/minio/cmd/http" + "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/wildcard" "github.com/rs/cors" ) @@ -48,6 +53,7 @@ func newCachedObjectLayerFn() CacheObjectLayer { type objectAPIHandlers struct { ObjectAPI func() ObjectLayer CacheAPI func() CacheObjectLayer + Throttler map[string]chan struct{} } // getHost tries its best to return the request host. @@ -60,12 +66,39 @@ func getHost(r *http.Request) string { return r.Host } +// api throttler constants +const ( + listAPI = "LIST" + + granularDeadline = 10 * time.Second +) + +func parseThrottler(throttle string) map[string]chan struct{} { + th := make(map[string]chan struct{}) + for _, v := range strings.Split(throttle, ";") { + vs := strings.SplitN(v, "=", 2) + if len(vs) == 2 { + l, err := strconv.Atoi(vs[1]) + if err == nil { + if l >= len(globalEndpoints.Hostnames()) { + l /= len(globalEndpoints.Hostnames()) + } else { + l = 1 + } + th[vs[0]] = make(chan struct{}, l) + } + } + } + return th +} + // registerAPIRouter - registers S3 compatible APIs. func registerAPIRouter(router *mux.Router) { // Initialize API. api := objectAPIHandlers{ ObjectAPI: newObjectLayerFn, CacheAPI: newCachedObjectLayerFn, + Throttler: parseThrottler(env.Get(api.EnvAPIRequestsGranularMax, "")), } // API Router diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 041345770..24b86e307 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -67,7 +67,7 @@ func waitForLowHTTPReq(maxIO int, maxWait time.Duration) { // Bucket notification and http trace are not costly, it is okay to ignore them // while counting the number of concurrent connections maxIOFn := func() int { - return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers()) + return maxIO + globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers() } if httpServer := newHTTPServerFn(); httpServer != nil { diff --git a/cmd/bucket-listobjects-handlers.go b/cmd/bucket-listobjects-handlers.go index 30c09c899..4ed0be418 100644 --- a/cmd/bucket-listobjects-handlers.go +++ b/cmd/bucket-listobjects-handlers.go @@ -22,6 +22,7 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/gorilla/mux" "github.com/minio/minio/cmd/logger" @@ -82,9 +83,27 @@ func validateListObjectsArgs(marker, delimiter, encodingType string, maxKeys int // of the versions of objects in a bucket. func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ListObjectVersions") - defer logger.AuditLog(w, r, "ListObjectVersions", mustGetClaimsFromToken(r)) + pool := api.Throttler[listAPI] + if pool != nil { + deadlineTimer := time.NewTimer(granularDeadline) + defer deadlineTimer.Stop() + + select { + case pool <- struct{}{}: + defer func() { <-pool }() + case <-deadlineTimer.C: + // Send a http timeout message + writeErrorResponse(ctx, w, + errorCodes.ToAPIErr(ErrOperationMaxedOut), + r.URL, guessIsBrowserReq(r)) + return + case <-ctx.Done(): + return + } + } + vars := mux.Vars(r) bucket := vars["bucket"] @@ -119,6 +138,7 @@ func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r if forwardStr == "" { forwardStr = bucket } + if proxyRequestByStringHash(ctx, w, r, forwardStr) { return } @@ -152,9 +172,27 @@ func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r // MinIO continues to support ListObjectsV1 and V2 for supporting legacy tools. func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ListObjectsV2M") - defer logger.AuditLog(w, r, "ListObjectsV2M", mustGetClaimsFromToken(r)) + pool := api.Throttler[listAPI] + if pool != nil { + deadlineTimer := time.NewTimer(granularDeadline) + defer deadlineTimer.Stop() + + select { + case pool <- struct{}{}: + defer func() { <-pool }() + case <-deadlineTimer.C: + // Send a http timeout message + writeErrorResponse(ctx, w, + errorCodes.ToAPIErr(ErrOperationMaxedOut), + r.URL, guessIsBrowserReq(r)) + return + case <-ctx.Done(): + return + } + } + vars := mux.Vars(r) bucket := vars["bucket"] @@ -229,9 +267,27 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt // MinIO continues to support ListObjectsV1 for supporting legacy tools. func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ListObjectsV2") - defer logger.AuditLog(w, r, "ListObjectsV2", mustGetClaimsFromToken(r)) + pool := api.Throttler[listAPI] + if pool != nil { + deadlineTimer := time.NewTimer(granularDeadline) + defer deadlineTimer.Stop() + + select { + case pool <- struct{}{}: + defer func() { <-pool }() + case <-deadlineTimer.C: + // Send a http timeout message + writeErrorResponse(ctx, w, + errorCodes.ToAPIErr(ErrOperationMaxedOut), + r.URL, guessIsBrowserReq(r)) + return + case <-ctx.Done(): + return + } + } + vars := mux.Vars(r) bucket := vars["bucket"] @@ -358,9 +414,27 @@ func proxyRequestByStringHash(ctx context.Context, w http.ResponseWriter, r *htt // func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ListObjectsV1") - defer logger.AuditLog(w, r, "ListObjectsV1", mustGetClaimsFromToken(r)) + pool := api.Throttler[listAPI] + if pool != nil { + deadlineTimer := time.NewTimer(granularDeadline) + defer deadlineTimer.Stop() + + select { + case pool <- struct{}{}: + defer func() { <-pool }() + case <-deadlineTimer.C: + // Send a http timeout message + writeErrorResponse(ctx, w, + errorCodes.ToAPIErr(ErrOperationMaxedOut), + r.URL, guessIsBrowserReq(r)) + return + case <-ctx.Done(): + return + } + } + vars := mux.Vars(r) bucket := vars["bucket"] @@ -393,6 +467,7 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http if forwardStr == "" { forwardStr = bucket } + if proxyRequestByStringHash(ctx, w, r, forwardStr) { return } diff --git a/cmd/config/api/api.go b/cmd/config/api/api.go index 3b054690e..c818ca0d2 100644 --- a/cmd/config/api/api.go +++ b/cmd/config/api/api.go @@ -36,6 +36,7 @@ const ( apiRemoteTransportDeadline = "remote_transport_deadline" EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" + EnvAPIRequestsGranularMax = "MINIO_API_REQUESTS_GRANULAR_MAX" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" diff --git a/cmd/handler-api.go b/cmd/handler-api.go index e023e042f..a2a352c78 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -85,34 +85,34 @@ func (t *apiConfig) getClusterDeadline() time.Duration { return t.clusterDeadline } -func (t *apiConfig) getRequestsPool() (chan struct{}, <-chan time.Time) { +func (t *apiConfig) getRequestsPool() (chan struct{}, time.Duration) { t.mu.RLock() defer t.mu.RUnlock() if t.requestsPool == nil { - return nil, nil - } - if t.requestsDeadline <= 0 { - return t.requestsPool, nil + return nil, time.Duration(0) } - return t.requestsPool, time.NewTimer(t.requestsDeadline).C + return t.requestsPool, t.requestsDeadline } // maxClients throttles the S3 API calls func maxClients(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - pool, deadlineTimer := globalAPIConfig.getRequestsPool() + pool, deadline := globalAPIConfig.getRequestsPool() if pool == nil { f.ServeHTTP(w, r) return } + deadlineTimer := time.NewTimer(deadline) + defer deadlineTimer.Stop() + select { case pool <- struct{}{}: defer func() { <-pool }() f.ServeHTTP(w, r) - case <-deadlineTimer: + case <-deadlineTimer.C: // Send a http timeout message writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrOperationMaxedOut),