diff --git a/pkg/server/api/ratelimit-handlers.go b/pkg/server/api/ratelimit-handlers.go deleted file mode 100644 index ab35284de..000000000 --- a/pkg/server/api/ratelimit-handlers.go +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Minimalist Object Storage, (C) 2015 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package api - -import "net/http" - -// rateLimit -type rateLimit struct { - handler http.Handler - rateQueue chan bool -} - -func (c rateLimit) Add() { - c.rateQueue <- true // fill in the queue - return -} - -func (c rateLimit) Remove() { - <-c.rateQueue // invalidate the queue, after the request is served - return -} - -// ServeHTTP is an http.Handler ServeHTTP method -func (c rateLimit) ServeHTTP(w http.ResponseWriter, req *http.Request) { - c.Add() // add - c.handler.ServeHTTP(w, req) // serve - c.Remove() // remove -} - -// RateLimitHandler limits the number of concurrent http requests -func RateLimitHandler(handle http.Handler, limit int) http.Handler { - return rateLimit{ - handler: handle, - rateQueue: make(chan bool, limit), - } -} diff --git a/pkg/server/minhttp/http.go b/pkg/server/minhttp/http.go index 57ed99a4d..6c2d71467 100644 --- a/pkg/server/minhttp/http.go +++ b/pkg/server/minhttp/http.go @@ -165,3 +165,50 @@ func ListenAndServe(servers ...*http.Server) error { return nil } } + +// ListenAndServeLimited is similar to ListenAndServe but ratelimited with connLimit value +func ListenAndServeLimited(connLimit int, servers ...*http.Server) error { + // get parent process id + ppid := os.Getppid() + + a := &app{ + servers: servers, + listeners: make([]net.Listener, 0, len(servers)), + sds: make([]httpdown.Server, 0, len(servers)), + net: &minNet{connLimit: connLimit}, + errors: make(chan error, 1+(len(servers)*2)), + } + + // Acquire Listeners + if err := a.listen(); err != nil { + return iodine.New(err, nil) + } + + // Start serving. + a.serve() + + // Close the parent if we inherited and it wasn't init that started us. + if os.Getenv("LISTEN_FDS") != "" && ppid != 1 { + if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil { + return iodine.New(err, nil) + } + } + + waitdone := make(chan struct{}) + go func() { + defer close(waitdone) + a.wait() + // communicate by sending not by closing a channel + waitdone <- struct{}{} + }() + + select { + case err := <-a.errors: + if err == nil { + panic("unexpected nil error") + } + return iodine.New(err, nil) + case <-waitdone: + return nil + } +} diff --git a/pkg/server/minhttp/listen.go b/pkg/server/minhttp/listen.go new file mode 100644 index 000000000..3053c581c --- /dev/null +++ b/pkg/server/minhttp/listen.go @@ -0,0 +1,61 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package minhttp + +import ( + "net" + "sync" + + "github.com/minio/minio/pkg/iodine" +) + +// rateLimitedListener returns a Listener that accepts at most n simultaneous +// connections from the provided Listener. +func rateLimitedListener(l net.Listener, nconn int) net.Listener { + return &rateLimitListener{l, make(chan struct{}, nconn)} +} + +type rateLimitListener struct { + net.Listener + sem chan struct{} +} + +func (l *rateLimitListener) accept() { l.sem <- struct{}{} } +func (l *rateLimitListener) release() { <-l.sem } + +func (l *rateLimitListener) Accept() (net.Conn, error) { + l.accept() + + c, err := l.Listener.Accept() + if err != nil { + l.release() + return nil, iodine.New(err, nil) + } + return &rateLimitListenerConn{Conn: c, release: l.release}, nil +} + +type rateLimitListenerConn struct { + net.Conn + releaseOnce sync.Once + release func() +} + +func (l *rateLimitListenerConn) Close() error { + err := l.Conn.Close() + l.releaseOnce.Do(l.release) + return iodine.New(err, nil) +} diff --git a/pkg/server/minhttp/net.go b/pkg/server/minhttp/net.go index 0bdbe6514..550d9ffe6 100644 --- a/pkg/server/minhttp/net.go +++ b/pkg/server/minhttp/net.go @@ -55,6 +55,7 @@ var originalWD, _ = os.Getwd() type minNet struct { inheritedListeners []net.Listener activeListeners []net.Listener + connLimit int mutex sync.Mutex inheritOnce sync.Once } @@ -130,7 +131,8 @@ func (n *minNet) Listen(nett, laddr string) (net.Listener, error) { // ListenTCP announces on the local network address laddr. The network net must // be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the // matching network and address, or creates a new one using net.ListenTCP. -func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) { +func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (net.Listener, error) { + var err error if err := n.getInheritedListeners(); err != nil { return nil, iodine.New(err, nil) } @@ -151,11 +153,15 @@ func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, e } } + var l net.Listener // make a fresh listener - l, err := net.ListenTCP(nett, laddr) + l, err = net.ListenTCP(nett, laddr) if err != nil { return nil, iodine.New(err, nil) } + if n.connLimit > 0 { + l = rateLimitedListener(l, n.connLimit) + } n.activeListeners = append(n.activeListeners, l) return l, nil } @@ -163,7 +169,8 @@ func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, e // ListenUnix announces on the local network address laddr. The network net // must be a: "unix" or "unixpacket". It returns an inherited net.Listener for // the matching network and address, or creates a new one using net.ListenUnix. -func (n *minNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) { +func (n *minNet) ListenUnix(nett string, laddr *net.UnixAddr) (net.Listener, error) { + var err error if err := n.getInheritedListeners(); err != nil { return nil, iodine.New(err, nil) } @@ -184,11 +191,15 @@ func (n *minNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener } } + var l net.Listener // make a fresh listener - l, err := net.ListenUnix(nett, laddr) + l, err = net.ListenUnix(nett, laddr) if err != nil { return nil, iodine.New(err, nil) } + if n.connLimit > 0 { + l = rateLimitedListener(l, n.connLimit) + } n.activeListeners = append(n.activeListeners, l) return l, nil } diff --git a/pkg/server/router.go b/pkg/server/router.go index 4c40b7fe9..9bcf76435 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -89,7 +89,6 @@ func registerCustomMiddleware(mux http.Handler, conf api.Config) http.Handler { ) mux = ch.final(mux) - mux = api.RateLimitHandler(mux, conf.RateLimit) return mux } diff --git a/pkg/server/server.go b/pkg/server/server.go index b3df6e46c..65a3943d6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -114,7 +114,7 @@ func StartServices(conf api.Config) error { // start ticket master go startTM(minioAPI) - if err := minhttp.ListenAndServe(apiServer, rpcServer); err != nil { + if err := minhttp.ListenAndServeLimited(conf.RateLimit, apiServer, rpcServer); err != nil { return iodine.New(err, nil) } return nil