Add a ratelimited listener than a ratelimited handler - more precise

This commit is contained in:
Harshavardhana 2015-07-13 10:26:04 -07:00
parent 8af5933b07
commit 1bad92356d
6 changed files with 124 additions and 56 deletions

View file

@ -1,50 +0,0 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package api
import "net/http"
// rateLimit
type rateLimit struct {
handler http.Handler
rateQueue chan bool
}
func (c rateLimit) Add() {
c.rateQueue <- true // fill in the queue
return
}
func (c rateLimit) Remove() {
<-c.rateQueue // invalidate the queue, after the request is served
return
}
// ServeHTTP is an http.Handler ServeHTTP method
func (c rateLimit) ServeHTTP(w http.ResponseWriter, req *http.Request) {
c.Add() // add
c.handler.ServeHTTP(w, req) // serve
c.Remove() // remove
}
// RateLimitHandler limits the number of concurrent http requests
func RateLimitHandler(handle http.Handler, limit int) http.Handler {
return rateLimit{
handler: handle,
rateQueue: make(chan bool, limit),
}
}

View file

@ -165,3 +165,50 @@ func ListenAndServe(servers ...*http.Server) error {
return nil
}
}
// ListenAndServeLimited is similar to ListenAndServe but ratelimited with connLimit value
func ListenAndServeLimited(connLimit int, servers ...*http.Server) error {
// get parent process id
ppid := os.Getppid()
a := &app{
servers: servers,
listeners: make([]net.Listener, 0, len(servers)),
sds: make([]httpdown.Server, 0, len(servers)),
net: &minNet{connLimit: connLimit},
errors: make(chan error, 1+(len(servers)*2)),
}
// Acquire Listeners
if err := a.listen(); err != nil {
return iodine.New(err, nil)
}
// Start serving.
a.serve()
// Close the parent if we inherited and it wasn't init that started us.
if os.Getenv("LISTEN_FDS") != "" && ppid != 1 {
if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil {
return iodine.New(err, nil)
}
}
waitdone := make(chan struct{})
go func() {
defer close(waitdone)
a.wait()
// communicate by sending not by closing a channel
waitdone <- struct{}{}
}()
select {
case err := <-a.errors:
if err == nil {
panic("unexpected nil error")
}
return iodine.New(err, nil)
case <-waitdone:
return nil
}
}

View file

@ -0,0 +1,61 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minhttp
import (
"net"
"sync"
"github.com/minio/minio/pkg/iodine"
)
// rateLimitedListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener.
func rateLimitedListener(l net.Listener, nconn int) net.Listener {
return &rateLimitListener{l, make(chan struct{}, nconn)}
}
type rateLimitListener struct {
net.Listener
sem chan struct{}
}
func (l *rateLimitListener) accept() { l.sem <- struct{}{} }
func (l *rateLimitListener) release() { <-l.sem }
func (l *rateLimitListener) Accept() (net.Conn, error) {
l.accept()
c, err := l.Listener.Accept()
if err != nil {
l.release()
return nil, iodine.New(err, nil)
}
return &rateLimitListenerConn{Conn: c, release: l.release}, nil
}
type rateLimitListenerConn struct {
net.Conn
releaseOnce sync.Once
release func()
}
func (l *rateLimitListenerConn) Close() error {
err := l.Conn.Close()
l.releaseOnce.Do(l.release)
return iodine.New(err, nil)
}

View file

@ -55,6 +55,7 @@ var originalWD, _ = os.Getwd()
type minNet struct {
inheritedListeners []net.Listener
activeListeners []net.Listener
connLimit int
mutex sync.Mutex
inheritOnce sync.Once
}
@ -130,7 +131,8 @@ func (n *minNet) Listen(nett, laddr string) (net.Listener, error) {
// ListenTCP announces on the local network address laddr. The network net must
// be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the
// matching network and address, or creates a new one using net.ListenTCP.
func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) {
func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (net.Listener, error) {
var err error
if err := n.getInheritedListeners(); err != nil {
return nil, iodine.New(err, nil)
}
@ -151,11 +153,15 @@ func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, e
}
}
var l net.Listener
// make a fresh listener
l, err := net.ListenTCP(nett, laddr)
l, err = net.ListenTCP(nett, laddr)
if err != nil {
return nil, iodine.New(err, nil)
}
if n.connLimit > 0 {
l = rateLimitedListener(l, n.connLimit)
}
n.activeListeners = append(n.activeListeners, l)
return l, nil
}
@ -163,7 +169,8 @@ func (n *minNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, e
// ListenUnix announces on the local network address laddr. The network net
// must be a: "unix" or "unixpacket". It returns an inherited net.Listener for
// the matching network and address, or creates a new one using net.ListenUnix.
func (n *minNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) {
func (n *minNet) ListenUnix(nett string, laddr *net.UnixAddr) (net.Listener, error) {
var err error
if err := n.getInheritedListeners(); err != nil {
return nil, iodine.New(err, nil)
}
@ -184,11 +191,15 @@ func (n *minNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener
}
}
var l net.Listener
// make a fresh listener
l, err := net.ListenUnix(nett, laddr)
l, err = net.ListenUnix(nett, laddr)
if err != nil {
return nil, iodine.New(err, nil)
}
if n.connLimit > 0 {
l = rateLimitedListener(l, n.connLimit)
}
n.activeListeners = append(n.activeListeners, l)
return l, nil
}

View file

@ -89,7 +89,6 @@ func registerCustomMiddleware(mux http.Handler, conf api.Config) http.Handler {
)
mux = ch.final(mux)
mux = api.RateLimitHandler(mux, conf.RateLimit)
return mux
}

View file

@ -114,7 +114,7 @@ func StartServices(conf api.Config) error {
// start ticket master
go startTM(minioAPI)
if err := minhttp.ListenAndServe(apiServer, rpcServer); err != nil {
if err := minhttp.ListenAndServeLimited(conf.RateLimit, apiServer, rpcServer); err != nil {
return iodine.New(err, nil)
}
return nil