From 02cfa774be569f9d46ac5e1689765a4a779a422f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 30 Oct 2020 12:20:28 -0700 Subject: [PATCH] allow requests to be proxied when server is booting up (#10790) when server is booting up there is a possibility that users might see '503' because object layer when not initialized, then the request is proxied to neighboring peers first one which is online. --- cmd/endpoint.go | 69 ++++++++++++++++++++++++++++++++++++++ cmd/generic-handlers.go | 32 ++++++++++++++++-- cmd/healthcheck-handler.go | 20 ++++++++--- cmd/http/headers.go | 6 ++++ cmd/prepare-storage.go | 21 +++++++----- cmd/routers.go | 4 +++ 6 files changed, 137 insertions(+), 15 deletions(-) diff --git a/cmd/endpoint.go b/cmd/endpoint.go index a16398729..50e7325c8 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -17,7 +17,9 @@ package cmd import ( + "context" "crypto/tls" + "errors" "fmt" "net" "net/http" @@ -34,6 +36,7 @@ import ( "github.com/dustin/go-humanize" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/cmd/config" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/rest" "github.com/minio/minio/pkg/env" @@ -770,6 +773,72 @@ func GetProxyEndpointLocalIndex(proxyEps []ProxyEndpoint) int { return -1 } +func httpDo(clnt *http.Client, req *http.Request, f func(*http.Response, error) error) error { + ctx, cancel := context.WithTimeout(GlobalContext, 200*time.Millisecond) + defer cancel() + + // Run the HTTP request in a goroutine and pass the response to f. + c := make(chan error, 1) + req = req.WithContext(ctx) + go func() { c <- f(clnt.Do(req)) }() + select { + case <-ctx.Done(): + <-c // Wait for f to return. + return ctx.Err() + case err := <-c: + return err + } +} + +func getOnlineProxyEndpointIdx() int { + type reqIndex struct { + Request *http.Request + Idx int + } + + proxyRequests := make(map[*http.Client]reqIndex, len(globalProxyEndpoints)) + for i, proxyEp := range globalProxyEndpoints { + proxyEp := proxyEp + serverURL := &url.URL{ + Scheme: proxyEp.Scheme, + Host: proxyEp.Host, + Path: pathJoin(healthCheckPathPrefix, healthCheckLivenessPath), + } + + req, err := http.NewRequest(http.MethodGet, serverURL.String(), nil) + if err != nil { + continue + } + + proxyRequests[&http.Client{ + Transport: proxyEp.Transport, + }] = reqIndex{ + Request: req, + Idx: i, + } + } + + for c, r := range proxyRequests { + if err := httpDo(c, r.Request, func(resp *http.Response, err error) error { + if err != nil { + return err + } + xhttp.DrainBody(resp.Body) + if resp.StatusCode != http.StatusOK { + return errors.New(resp.Status) + } + if v := resp.Header.Get(xhttp.MinIOServerStatus); v == unavailable { + return errors.New(v) + } + return nil + }); err != nil { + continue + } + return r.Idx + } + return -1 +} + // GetProxyEndpoints - get all endpoints that can be used to proxy list request. func GetProxyEndpoints(endpointServerSets EndpointServerSets) ([]ProxyEndpoint, error) { var proxyEps []ProxyEndpoint diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index 2eb4f35a1..347112921 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "net/http" "strings" "time" @@ -157,15 +158,40 @@ const ( loginPathPrefix = SlashSeparator + "login" ) -// Adds redirect rules for incoming requests. type redirectHandler struct { handler http.Handler } -func setBrowserRedirectHandler(h http.Handler) http.Handler { +func setRedirectHandler(h http.Handler) http.Handler { return redirectHandler{handler: h} } +// Adds redirect rules for incoming requests. +type browserRedirectHandler struct { + handler http.Handler +} + +func setBrowserRedirectHandler(h http.Handler) http.Handler { + return browserRedirectHandler{handler: h} +} + +func (h redirectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch { + case guessIsRPCReq(r), guessIsBrowserReq(r), guessIsHealthCheckReq(r), guessIsMetricsReq(r), isAdminReq(r): + h.handler.ServeHTTP(w, r) + return + case newObjectLayerFn() == nil: + // if this server is still initializing, proxy the request + // to any other online servers to avoid 503 for any incoming + // API calls. + if idx := getOnlineProxyEndpointIdx(); idx >= 0 { + proxyRequest(context.TODO(), w, r, globalProxyEndpoints[idx]) + return + } + } + h.handler.ServeHTTP(w, r) +} + // Fetch redirect location if urlPath satisfies certain // criteria. Some special names are considered to be // redirectable, this is purely internal function and @@ -236,7 +262,7 @@ func guessIsRPCReq(req *http.Request) bool { strings.HasPrefix(req.URL.Path, minioReservedBucketPath+SlashSeparator) } -func (h redirectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h browserRedirectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Re-direction is handled specifically for browser requests. if guessIsBrowserReq(r) { // Fetch the redirect location if any. diff --git a/cmd/healthcheck-handler.go b/cmd/healthcheck-handler.go index 8f1de552b..159a787a6 100644 --- a/cmd/healthcheck-handler.go +++ b/cmd/healthcheck-handler.go @@ -20,8 +20,12 @@ import ( "context" "net/http" "strconv" + + xhttp "github.com/minio/minio/cmd/http" ) +const unavailable = "offline" + // ClusterCheckHandler returns if the server is ready for requests. func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ClusterCheckHandler") @@ -29,6 +33,7 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) { objLayer := newObjectLayerFn() // Service not initialized yet if objLayer == nil { + w.Header().Set(xhttp.MinIOServerStatus, unavailable) writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone) return } @@ -39,12 +44,12 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) { opts := HealthOptions{Maintenance: r.URL.Query().Get("maintenance") == "true"} result := objLayer.Health(ctx, opts) if result.WriteQuorum > 0 { - w.Header().Set("X-Minio-Write-Quorum", strconv.Itoa(result.WriteQuorum)) + w.Header().Set(xhttp.MinIOWriteQuorum, strconv.Itoa(result.WriteQuorum)) } if !result.Healthy { // return how many drives are being healed if any if result.HealingDrives > 0 { - w.Header().Set("X-Minio-Healing-Drives", strconv.Itoa(result.HealingDrives)) + w.Header().Set(xhttp.MinIOHealingDrives, strconv.Itoa(result.HealingDrives)) } // As a maintenance call we are purposefully asked to be taken // down, this is for orchestrators to know if we can safely @@ -61,12 +66,19 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) { // ReadinessCheckHandler Checks if the process is up. Always returns success. func ReadinessCheckHandler(w http.ResponseWriter, r *http.Request) { - // TODO: only implement this function to notify that this pod is - // busy, at a local scope in future, for now '200 OK'. + if newObjectLayerFn() == nil { + // Service not initialized yet + w.Header().Set(xhttp.MinIOServerStatus, unavailable) + } + writeResponse(w, http.StatusOK, nil, mimeNone) } // LivenessCheckHandler - Checks if the process is up. Always returns success. func LivenessCheckHandler(w http.ResponseWriter, r *http.Request) { + if newObjectLayerFn() == nil { + // Service not initialized yet + w.Header().Set(xhttp.MinIOServerStatus, unavailable) + } writeResponse(w, http.StatusOK, nil, mimeNone) } diff --git a/cmd/http/headers.go b/cmd/http/headers.go index a9806a07b..24a20c15a 100644 --- a/cmd/http/headers.go +++ b/cmd/http/headers.go @@ -126,6 +126,12 @@ const ( // Header indicates if the etag should be preserved by client MinIOSourceETag = "x-minio-source-etag" + + // Writes expected write quorum + MinIOWriteQuorum = "x-minio-write-quorum" + + // Reports number of drives currently healing + MinIOHealingDrives = "x-minio-healing-drives" ) // Common http query params S3 API diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 2368d787f..01c3b7c13 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -23,7 +23,6 @@ import ( "net/http" "net/url" "os" - "path" "sync" "time" @@ -177,7 +176,7 @@ func IsServerResolvable(endpoint Endpoint) error { serverURL := &url.URL{ Scheme: endpoint.Scheme, Host: endpoint.Host, - Path: path.Join(healthCheckPathPrefix, healthCheckLivenessPath), + Path: pathJoin(healthCheckPathPrefix, healthCheckLivenessPath), } var tlsConfig *tls.Config @@ -195,9 +194,9 @@ func IsServerResolvable(endpoint Endpoint) error { &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: xhttp.NewCustomDialContext(3 * time.Second), - ResponseHeaderTimeout: 5 * time.Second, - TLSHandshakeTimeout: 5 * time.Second, - ExpectContinueTimeout: 5 * time.Second, + ResponseHeaderTimeout: 3 * time.Second, + TLSHandshakeTimeout: 3 * time.Second, + ExpectContinueTimeout: 3 * time.Second, TLSClientConfig: tlsConfig, // Go net/http automatically unzip if content-type is // gzip disable this feature, as we are always interested @@ -207,23 +206,29 @@ func IsServerResolvable(endpoint Endpoint) error { } defer httpClient.CloseIdleConnections() - ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(GlobalContext, 3*time.Second) req, err := http.NewRequestWithContext(ctx, http.MethodGet, serverURL.String(), nil) if err != nil { + cancel() return err } resp, err := httpClient.Do(req) + cancel() if err != nil { return err } - defer xhttp.DrainBody(resp.Body) + xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return StorageErr(resp.Status) } + + if resp.Header.Get(xhttp.MinIOServerStatus) == unavailable { + return StorageErr(unavailable) + } + return nil } diff --git a/cmd/routers.go b/cmd/routers.go index 467041ae5..fe47081ff 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -39,6 +39,10 @@ func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointS // List of some generic handlers which are applied for all incoming requests. var globalHandlers = []MiddlewareFunc{ + // add redirect handler to redirect + // requests when object layer is not + // initialized. + setRedirectHandler, // set x-amz-request-id header. addCustomHeaders, // set HTTP security headers such as Content-Security-Policy.