fix: add api level throttler for LIST calls
This commit is contained in:
parent
dc1a46e5d2
commit
5151c429e4
|
@ -36,7 +36,7 @@ const (
|
|||
|
||||
// adminAPIHandlers provides HTTP handlers for MinIO admin API.
|
||||
type adminAPIHandlers struct {
|
||||
mu sync.Mutex
|
||||
mu *sync.Mutex
|
||||
healSetsMap map[string]healInitSetParams
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,7 @@ type adminAPIHandlers struct {
|
|||
func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) {
|
||||
|
||||
adminAPI := adminAPIHandlers{
|
||||
mu: &sync.Mutex{},
|
||||
healSetsMap: make(map[string]healInitSetParams),
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,14 @@ package cmd
|
|||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio/cmd/config/api"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
"github.com/minio/minio/pkg/wildcard"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
@ -48,6 +53,7 @@ func newCachedObjectLayerFn() CacheObjectLayer {
|
|||
type objectAPIHandlers struct {
|
||||
ObjectAPI func() ObjectLayer
|
||||
CacheAPI func() CacheObjectLayer
|
||||
Throttler map[string]chan struct{}
|
||||
}
|
||||
|
||||
// getHost tries its best to return the request host.
|
||||
|
@ -60,12 +66,39 @@ func getHost(r *http.Request) string {
|
|||
return r.Host
|
||||
}
|
||||
|
||||
// api throttler constants
|
||||
const (
|
||||
listAPI = "LIST"
|
||||
|
||||
granularDeadline = 10 * time.Second
|
||||
)
|
||||
|
||||
func parseThrottler(throttle string) map[string]chan struct{} {
|
||||
th := make(map[string]chan struct{})
|
||||
for _, v := range strings.Split(throttle, ";") {
|
||||
vs := strings.SplitN(v, "=", 2)
|
||||
if len(vs) == 2 {
|
||||
l, err := strconv.Atoi(vs[1])
|
||||
if err == nil {
|
||||
if l >= len(globalEndpoints.Hostnames()) {
|
||||
l /= len(globalEndpoints.Hostnames())
|
||||
} else {
|
||||
l = 1
|
||||
}
|
||||
th[vs[0]] = make(chan struct{}, l)
|
||||
}
|
||||
}
|
||||
}
|
||||
return th
|
||||
}
|
||||
|
||||
// registerAPIRouter - registers S3 compatible APIs.
|
||||
func registerAPIRouter(router *mux.Router) {
|
||||
// Initialize API.
|
||||
api := objectAPIHandlers{
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
CacheAPI: newCachedObjectLayerFn,
|
||||
Throttler: parseThrottler(env.Get(api.EnvAPIRequestsGranularMax, "")),
|
||||
}
|
||||
|
||||
// API Router
|
||||
|
|
|
@ -67,7 +67,7 @@ func waitForLowHTTPReq(maxIO int, maxWait time.Duration) {
|
|||
// Bucket notification and http trace are not costly, it is okay to ignore them
|
||||
// while counting the number of concurrent connections
|
||||
maxIOFn := func() int {
|
||||
return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers())
|
||||
return maxIO + globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers()
|
||||
}
|
||||
|
||||
if httpServer := newHTTPServerFn(); httpServer != nil {
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
|
@ -82,9 +83,27 @@ func validateListObjectsArgs(marker, delimiter, encodingType string, maxKeys int
|
|||
// of the versions of objects in a bucket.
|
||||
func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ListObjectVersions")
|
||||
|
||||
defer logger.AuditLog(w, r, "ListObjectVersions", mustGetClaimsFromToken(r))
|
||||
|
||||
pool := api.Throttler[listAPI]
|
||||
if pool != nil {
|
||||
deadlineTimer := time.NewTimer(granularDeadline)
|
||||
defer deadlineTimer.Stop()
|
||||
|
||||
select {
|
||||
case pool <- struct{}{}:
|
||||
defer func() { <-pool }()
|
||||
case <-deadlineTimer.C:
|
||||
// Send a http timeout message
|
||||
writeErrorResponse(ctx, w,
|
||||
errorCodes.ToAPIErr(ErrOperationMaxedOut),
|
||||
r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
|
@ -119,6 +138,7 @@ func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r
|
|||
if forwardStr == "" {
|
||||
forwardStr = bucket
|
||||
}
|
||||
|
||||
if proxyRequestByStringHash(ctx, w, r, forwardStr) {
|
||||
return
|
||||
}
|
||||
|
@ -152,9 +172,27 @@ func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r
|
|||
// MinIO continues to support ListObjectsV1 and V2 for supporting legacy tools.
|
||||
func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ListObjectsV2M")
|
||||
|
||||
defer logger.AuditLog(w, r, "ListObjectsV2M", mustGetClaimsFromToken(r))
|
||||
|
||||
pool := api.Throttler[listAPI]
|
||||
if pool != nil {
|
||||
deadlineTimer := time.NewTimer(granularDeadline)
|
||||
defer deadlineTimer.Stop()
|
||||
|
||||
select {
|
||||
case pool <- struct{}{}:
|
||||
defer func() { <-pool }()
|
||||
case <-deadlineTimer.C:
|
||||
// Send a http timeout message
|
||||
writeErrorResponse(ctx, w,
|
||||
errorCodes.ToAPIErr(ErrOperationMaxedOut),
|
||||
r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
|
@ -229,9 +267,27 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
|
|||
// MinIO continues to support ListObjectsV1 for supporting legacy tools.
|
||||
func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ListObjectsV2")
|
||||
|
||||
defer logger.AuditLog(w, r, "ListObjectsV2", mustGetClaimsFromToken(r))
|
||||
|
||||
pool := api.Throttler[listAPI]
|
||||
if pool != nil {
|
||||
deadlineTimer := time.NewTimer(granularDeadline)
|
||||
defer deadlineTimer.Stop()
|
||||
|
||||
select {
|
||||
case pool <- struct{}{}:
|
||||
defer func() { <-pool }()
|
||||
case <-deadlineTimer.C:
|
||||
// Send a http timeout message
|
||||
writeErrorResponse(ctx, w,
|
||||
errorCodes.ToAPIErr(ErrOperationMaxedOut),
|
||||
r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
|
@ -358,9 +414,27 @@ func proxyRequestByStringHash(ctx context.Context, w http.ResponseWriter, r *htt
|
|||
//
|
||||
func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ListObjectsV1")
|
||||
|
||||
defer logger.AuditLog(w, r, "ListObjectsV1", mustGetClaimsFromToken(r))
|
||||
|
||||
pool := api.Throttler[listAPI]
|
||||
if pool != nil {
|
||||
deadlineTimer := time.NewTimer(granularDeadline)
|
||||
defer deadlineTimer.Stop()
|
||||
|
||||
select {
|
||||
case pool <- struct{}{}:
|
||||
defer func() { <-pool }()
|
||||
case <-deadlineTimer.C:
|
||||
// Send a http timeout message
|
||||
writeErrorResponse(ctx, w,
|
||||
errorCodes.ToAPIErr(ErrOperationMaxedOut),
|
||||
r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
|
@ -393,6 +467,7 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
|
|||
if forwardStr == "" {
|
||||
forwardStr = bucket
|
||||
}
|
||||
|
||||
if proxyRequestByStringHash(ctx, w, r, forwardStr) {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ const (
|
|||
apiRemoteTransportDeadline = "remote_transport_deadline"
|
||||
|
||||
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
|
||||
EnvAPIRequestsGranularMax = "MINIO_API_REQUESTS_GRANULAR_MAX"
|
||||
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
|
||||
EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE"
|
||||
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
|
||||
|
|
|
@ -85,34 +85,34 @@ func (t *apiConfig) getClusterDeadline() time.Duration {
|
|||
return t.clusterDeadline
|
||||
}
|
||||
|
||||
func (t *apiConfig) getRequestsPool() (chan struct{}, <-chan time.Time) {
|
||||
func (t *apiConfig) getRequestsPool() (chan struct{}, time.Duration) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if t.requestsPool == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if t.requestsDeadline <= 0 {
|
||||
return t.requestsPool, nil
|
||||
return nil, time.Duration(0)
|
||||
}
|
||||
|
||||
return t.requestsPool, time.NewTimer(t.requestsDeadline).C
|
||||
return t.requestsPool, t.requestsDeadline
|
||||
}
|
||||
|
||||
// maxClients throttles the S3 API calls
|
||||
func maxClients(f http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
pool, deadlineTimer := globalAPIConfig.getRequestsPool()
|
||||
pool, deadline := globalAPIConfig.getRequestsPool()
|
||||
if pool == nil {
|
||||
f.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
deadlineTimer := time.NewTimer(deadline)
|
||||
defer deadlineTimer.Stop()
|
||||
|
||||
select {
|
||||
case pool <- struct{}{}:
|
||||
defer func() { <-pool }()
|
||||
f.ServeHTTP(w, r)
|
||||
case <-deadlineTimer:
|
||||
case <-deadlineTimer.C:
|
||||
// Send a http timeout message
|
||||
writeErrorResponse(r.Context(), w,
|
||||
errorCodes.ToAPIErr(ErrOperationMaxedOut),
|
||||
|
|
Loading…
Reference in a new issue