Honor connection pooling while tracing (#7979)

This PR fixes relying on r.Context().Done()
by setting

```
Connection: "close"
```

HTTP Header, this has detrimental issues for
client side connection pooling. Since this
header explicitly tells clients to turn-off
connection pooling. This causing pro-active
connections to be closed leaving many conn's
in TIME_WAIT state. This can be observed with
`mc admin trace -a` when running distributed
setup.

This PR also fixes tracing filtering issue
when bucket names have `minio` as prefixes,
trace was erroneously ignoring them.
This commit is contained in:
Harshavardhana 2019-07-31 11:08:39 -07:00 committed by kannappanr
parent cbd02c58be
commit 123cccaed1
4 changed files with 48 additions and 59 deletions

View file

@ -1459,6 +1459,23 @@ func (a adminAPIHandlers) SetConfigKeysHandler(w http.ResponseWriter, r *http.Re
writeSuccessResponseHeadersOnly(w)
}
// Returns true if the trace.Info should be traced,
// false if certain conditions are not met.
// - input entry is not of the type *trace.Info*
// - errOnly entries are to be traced, not status code 2xx, 3xx.
// - all entries to be traced, if not trace only S3 API requests.
func mustTrace(entry interface{}, trcAll, errOnly bool) bool {
trcInfo, ok := entry.(trace.Info)
if !ok {
return false
}
trace := trcAll || !hasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath+slashSeparator)
if errOnly {
return trace && trcInfo.RespInfo.StatusCode >= http.StatusBadRequest
}
return trace
}
// TraceHandler - POST /minio/admin/v1/trace
// ----------
// The handler sends http trace to the connected HTTP client.
@ -1474,10 +1491,6 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Avoid reusing tcp connection if read timeout is hit
// This is needed to make r.Context().Done() work as
// expected in case of read timeout
w.Header().Set(xhttp.Connection, "close")
w.Header().Set(xhttp.ContentType, "text/event-stream")
doneCh := make(chan struct{})
@ -1487,28 +1500,22 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
// Use buffered channel to take care of burst sends or slow w.Write()
traceCh := make(chan interface{}, 4000)
filter := func(entry interface{}) bool {
trcInfo := entry.(trace.Info)
if trcErr && isHTTPStatusOK(trcInfo.RespInfo.StatusCode) {
return false
}
if trcAll {
return true
}
return !strings.HasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath)
}
remoteHosts := getRemoteHosts(globalEndpoints)
peers, err := getRestClients(remoteHosts)
peers, err := getRestClients(getRemoteHosts(globalEndpoints))
if err != nil {
return
}
globalHTTPTrace.Subscribe(traceCh, doneCh, filter)
globalHTTPTrace.Subscribe(traceCh, doneCh, func(entry interface{}) bool {
return mustTrace(entry, trcAll, trcErr)
})
for _, peer := range peers {
peer.Trace(traceCh, doneCh, trcAll, trcErr)
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := json.NewEncoder(w)
for {
select {
@ -1517,8 +1524,11 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.(http.Flusher).Flush()
case <-r.Context().Done():
return
case <-keepAliveTicker.C:
if _, err := w.Write([]byte(" ")); err != nil {
return
}
w.(http.Flusher).Flush()
case <-GlobalServiceDoneCh:
return
}

View file

@ -394,20 +394,3 @@ func getHostName(r *http.Request) (hostName string) {
}
return
}
func isHTTPStatusOK(statusCode int) bool {
// List of success status.
var successStatus = []int{
http.StatusOK,
http.StatusCreated,
http.StatusAccepted,
http.StatusNoContent,
http.StatusPartialContent,
}
for _, okstatus := range successStatus {
if statusCode == okstatus {
return true
}
}
return false
}

View file

@ -499,10 +499,12 @@ func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan stru
if err = dec.Decode(&info); err != nil {
return
}
select {
case traceCh <- info:
default:
// Do not block on slow receivers.
if len(info.NodeName) > 0 {
select {
case traceCh <- info:
default:
// Do not block on slow receivers.
}
}
}
}

View file

@ -28,7 +28,6 @@ import (
"time"
"github.com/gorilla/mux"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/lifecycle"
@ -719,30 +718,22 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
trcAll := r.URL.Query().Get(peerRESTTraceAll) == "true"
trcErr := r.URL.Query().Get(peerRESTTraceErr) == "true"
w.Header().Set(xhttp.Connection, "close")
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
filter := func(entry interface{}) bool {
trcInfo := entry.(trace.Info)
if trcErr && isHTTPStatusOK(trcInfo.RespInfo.StatusCode) {
return false
}
if trcAll {
return true
}
return !strings.HasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath)
}
doneCh := make(chan struct{})
defer close(doneCh)
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000)
globalHTTPTrace.Subscribe(ch, doneCh, filter)
globalHTTPTrace.Subscribe(ch, doneCh, func(entry interface{}) bool {
return mustTrace(entry, trcAll, trcErr)
})
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
enc := gob.NewEncoder(w)
for {
@ -752,8 +743,11 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.(http.Flusher).Flush()
case <-r.Context().Done():
return
case <-keepAliveTicker.C:
if err := enc.Encode(&trace.Info{}); err != nil {
return
}
w.(http.Flusher).Flush()
}
}
}