From 95ddf061ab355f4165900c64773a1888bc8a9afd Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Tue, 26 Jul 2016 23:17:11 +0200 Subject: [PATCH] Rate limit is working and supports limited waiting clients (#2295) --- rate-limit-handler.go | 49 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/rate-limit-handler.go b/rate-limit-handler.go index 2972be213..45b7fb424 100644 --- a/rate-limit-handler.go +++ b/rate-limit-handler.go @@ -17,36 +17,64 @@ package main import ( + "errors" "net/http" "sync" ) +var errTooManyRequests = errors.New("Too many clients in the waiting list") + // rateLimit - represents datatype of the functionality implemented to // limit the number of concurrent http requests. type rateLimit struct { - handler http.Handler - rqueue chan struct{} - releaseOnce sync.Once + handler http.Handler + lock sync.Mutex + workQueue chan struct{} + waitQueue chan struct{} } // acquire and release implement a way to send and receive from the // channel this is in-turn used to rate limit incoming connections in // ServeHTTP() http.Handler method. -func (c *rateLimit) acquire() { c.rqueue <- struct{}{} } -func (c *rateLimit) release() { <-c.rqueue } +func (c *rateLimit) acquire() error { + // Kick out clients when it is really crowded + if len(c.waitQueue) == cap(c.waitQueue) { + return errTooManyRequests + } + + // Add new element in waitQueue to keep track of clients + // wanting to process their requests + c.waitQueue <- struct{}{} + + // Moving from wait to work queue is protected by a mutex + // to avoid draining waitQueue with multiple simultaneous clients. + c.lock.Lock() + c.workQueue <- <-c.waitQueue + c.lock.Unlock() + return nil +} + +// Release one element from workQueue to serve a new client +// in the waiting list +func (c *rateLimit) release() { + <-c.workQueue +} // ServeHTTP is an http.Handler ServeHTTP method, implemented to rate // limit incoming HTTP requests. func (c *rateLimit) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Acquire the connection if queue is not full, otherwise // code path waits here until the previous case is true. - c.acquire() + if err := c.acquire(); err != nil { + w.WriteHeader(http.StatusTooManyRequests) + return + } // Serves the request. c.handler.ServeHTTP(w, r) - // Release by draining the channel once. - c.releaseOnce.Do(c.release) + // Release + c.release() } // setRateLimitHandler limits the number of concurrent http requests based on MINIO_MAXCONN. @@ -57,7 +85,8 @@ func setRateLimitHandler(handler http.Handler) http.Handler { // For max connection limit of > '0' we initialize rate limit handler. return &rateLimit{ - handler: handler, - rqueue: make(chan struct{}, globalMaxConn), + handler: handler, + workQueue: make(chan struct{}, globalMaxConn), + waitQueue: make(chan struct{}, globalMaxConn*4), } }