Add simple Ticket Master which pro-actively sends messages on proceedChannel

Handlers are going to wait on proceedChannel, this the initial step towards
providing priority for different set of API operations
master
Harshavardhana 10 years ago
parent 5cfb05465e
commit 12bde7df30
  1. 32
      pkg/server/api/api.go
  2. 75
      pkg/server/api/bucket-handlers.go
  3. 108
      pkg/server/api/object-handlers.go
  4. 44
      pkg/server/router.go
  5. 23
      pkg/server/server.go

@ -0,0 +1,32 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package api
// Operation container for individual operations read by Ticket Master
type Operation struct {
ProceedCh chan struct{}
}
// Minio container for API and also carries OP (operation) channel
type Minio struct {
OP chan Operation
}
// New instantiate a new minio API
func New() Minio {
return Minio{OP: make(chan Operation)}
}

@ -23,10 +23,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
// MinioAPI - func (api Minio) isValidOp(w http.ResponseWriter, req *http.Request, acceptsContentType contentType) bool {
type MinioAPI struct{}
func (api MinioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acceptsContentType contentType) bool {
vars := mux.Vars(req) vars := mux.Vars(req)
bucket := vars["bucket"] bucket := vars["bucket"]
log.Println(bucket) log.Println(bucket)
@ -40,8 +37,17 @@ func (api MinioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acceptsC
// using the Initiate Multipart Upload request, but has not yet been completed or aborted. // using the Initiate Multipart Upload request, but has not yet been completed or aborted.
// This operation returns at most 1,000 multipart uploads in the response. // This operation returns at most 1,000 multipart uploads in the response.
// //
func (api MinioAPI) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) ListMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType) log.Println(acceptsContentType)
resources := getBucketMultipartResources(req.URL.Query()) resources := getBucketMultipartResources(req.URL.Query())
@ -60,8 +66,19 @@ func (api MinioAPI) ListMultipartUploadsHandler(w http.ResponseWriter, req *http
// of the objects in a bucket. You can use the request parameters as selection // of the objects in a bucket. You can use the request parameters as selection
// criteria to return a subset of the objects in a bucket. // criteria to return a subset of the objects in a bucket.
// //
func (api MinioAPI) ListObjectsHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) ListObjectsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// verify if bucket allows this operation // verify if bucket allows this operation
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -87,7 +104,7 @@ func (api MinioAPI) ListObjectsHandler(w http.ResponseWriter, req *http.Request)
// ----------- // -----------
// This implementation of the GET operation returns a list of all buckets // This implementation of the GET operation returns a list of all buckets
// owned by the authenticated sender of the request. // owned by the authenticated sender of the request.
func (api MinioAPI) ListBucketsHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) ListBucketsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
// uncomment this when we have webcli // uncomment this when we have webcli
// without access key credentials one cannot list buckets // without access key credentials one cannot list buckets
@ -95,13 +112,21 @@ func (api MinioAPI) ListBucketsHandler(w http.ResponseWriter, req *http.Request)
// writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) // writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path)
// return // return
// } // }
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType) log.Println(acceptsContentType)
} }
// PutBucketHandler - PUT Bucket // PutBucketHandler - PUT Bucket
// ---------- // ----------
// This implementation of the PUT operation creates a new bucket for authenticated request // This implementation of the PUT operation creates a new bucket for authenticated request
func (api MinioAPI) PutBucketHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) PutBucketHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
// uncomment this when we have webcli // uncomment this when we have webcli
// without access key credentials one cannot create a bucket // without access key credentials one cannot create a bucket
@ -109,6 +134,16 @@ func (api MinioAPI) PutBucketHandler(w http.ResponseWriter, req *http.Request) {
// writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path) // writeErrorResponse(w, req, AccessDenied, acceptsContentType, req.URL.Path)
// return // return
// } // }
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
if isRequestBucketACL(req.URL.Query()) { if isRequestBucketACL(req.URL.Query()) {
api.PutBucketACLHandler(w, req) api.PutBucketACLHandler(w, req)
return return
@ -128,8 +163,19 @@ func (api MinioAPI) PutBucketHandler(w http.ResponseWriter, req *http.Request) {
// PutBucketACLHandler - PUT Bucket ACL // PutBucketACLHandler - PUT Bucket ACL
// ---------- // ----------
// This implementation of the PUT operation modifies the bucketACL for authenticated request // This implementation of the PUT operation modifies the bucketACL for authenticated request
func (api MinioAPI) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) PutBucketACLHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// read from 'x-amz-acl' // read from 'x-amz-acl'
aclType := getACLType(req) aclType := getACLType(req)
if aclType == unsupportedACLType { if aclType == unsupportedACLType {
@ -148,8 +194,17 @@ func (api MinioAPI) PutBucketACLHandler(w http.ResponseWriter, req *http.Request
// The operation returns a 200 OK if the bucket exists and you // The operation returns a 200 OK if the bucket exists and you
// have permission to access it. Otherwise, the operation might // have permission to access it. Otherwise, the operation might
// return responses such as 404 Not Found and 403 Forbidden. // return responses such as 404 Not Found and 403 Forbidden.
func (api MinioAPI) HeadBucketHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) HeadBucketHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType) log.Println(acceptsContentType)
vars := mux.Vars(req) vars := mux.Vars(req)

@ -36,8 +36,19 @@ const (
// ---------- // ----------
// This implementation of the GET operation retrieves object. To use GET, // This implementation of the GET operation retrieves object. To use GET,
// you must have READ access to the object. // you must have READ access to the object.
func (api MinioAPI) GetObjectHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) GetObjectHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// verify if this operation is allowed // verify if this operation is allowed
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -54,8 +65,19 @@ func (api MinioAPI) GetObjectHandler(w http.ResponseWriter, req *http.Request) {
// HeadObjectHandler - HEAD Object // HeadObjectHandler - HEAD Object
// ----------- // -----------
// The HEAD operation retrieves metadata from an object without returning the object itself. // The HEAD operation retrieves metadata from an object without returning the object itself.
func (api MinioAPI) HeadObjectHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) HeadObjectHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// verify if this operation is allowed // verify if this operation is allowed
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -71,8 +93,19 @@ func (api MinioAPI) HeadObjectHandler(w http.ResponseWriter, req *http.Request)
// PutObjectHandler - PUT Object // PutObjectHandler - PUT Object
// ---------- // ----------
// This implementation of the PUT operation adds an object to a bucket. // This implementation of the PUT operation adds an object to a bucket.
func (api MinioAPI) PutObjectHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// verify if this operation is allowed // verify if this operation is allowed
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -121,8 +154,19 @@ func (api MinioAPI) PutObjectHandler(w http.ResponseWriter, req *http.Request) {
/// Multipart API /// Multipart API
// NewMultipartUploadHandler - New multipart upload // NewMultipartUploadHandler - New multipart upload
func (api MinioAPI) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) NewMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// handle ACL's here at bucket level // handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -141,8 +185,19 @@ func (api MinioAPI) NewMultipartUploadHandler(w http.ResponseWriter, req *http.R
} }
// PutObjectPartHandler - Upload part // PutObjectPartHandler - Upload part
func (api MinioAPI) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// handle ACL's here at bucket level // handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -190,8 +245,19 @@ func (api MinioAPI) PutObjectPartHandler(w http.ResponseWriter, req *http.Reques
} }
// AbortMultipartUploadHandler - Abort multipart upload // AbortMultipartUploadHandler - Abort multipart upload
func (api MinioAPI) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) AbortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// handle ACL's here at bucket level // handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -206,8 +272,19 @@ func (api MinioAPI) AbortMultipartUploadHandler(w http.ResponseWriter, req *http
} }
// ListObjectPartsHandler - List object parts // ListObjectPartsHandler - List object parts
func (api MinioAPI) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) ListObjectPartsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// handle ACL's here at bucket level // handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -225,8 +302,19 @@ func (api MinioAPI) ListObjectPartsHandler(w http.ResponseWriter, req *http.Requ
} }
// CompleteMultipartUploadHandler - Complete multipart upload // CompleteMultipartUploadHandler - Complete multipart upload
func (api MinioAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) CompleteMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req) acceptsContentType := getContentType(req)
op := Operation{}
op.ProceedCh = make(chan struct{})
api.OP <- op
// block until Ticket master gives us a go
<-op.ProceedCh
{
// do you operation
}
log.Println(acceptsContentType)
// handle ACL's here at bucket level // handle ACL's here at bucket level
if !api.isValidOp(w, req, acceptsContentType) { if !api.isValidOp(w, req, acceptsContentType) {
return return
@ -262,13 +350,13 @@ func (api MinioAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, req *h
/// Delete API /// Delete API
// DeleteBucketHandler - Delete bucket // DeleteBucketHandler - Delete bucket
func (api MinioAPI) DeleteBucketHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) DeleteBucketHandler(w http.ResponseWriter, req *http.Request) {
error := getErrorCode(NotImplemented) error := getErrorCode(NotImplemented)
w.WriteHeader(error.HTTPStatusCode) w.WriteHeader(error.HTTPStatusCode)
} }
// DeleteObjectHandler - Delete object // DeleteObjectHandler - Delete object
func (api MinioAPI) DeleteObjectHandler(w http.ResponseWriter, req *http.Request) { func (api Minio) DeleteObjectHandler(w http.ResponseWriter, req *http.Request) {
error := getErrorCode(NotImplemented) error := getErrorCode(NotImplemented)
w.WriteHeader(error.HTTPStatusCode) w.WriteHeader(error.HTTPStatusCode)
} }

@ -24,28 +24,31 @@ import (
"github.com/minio/minio/pkg/server/rpc" "github.com/minio/minio/pkg/server/rpc"
) )
func getAPI() api.Minio {
a := api.New()
return a
}
// registerAPI - register all the object API handlers to their respective paths // registerAPI - register all the object API handlers to their respective paths
func registerAPI(mux *router.Router) http.Handler { func registerAPI(mux *router.Router, a api.Minio) http.Handler {
api := api.MinioAPI{} mux.HandleFunc("/", a.ListBucketsHandler).Methods("GET")
mux.HandleFunc("/{bucket}", a.ListObjectsHandler).Methods("GET")
mux.HandleFunc("/", api.ListBucketsHandler).Methods("GET") mux.HandleFunc("/{bucket}", a.PutBucketHandler).Methods("PUT")
mux.HandleFunc("/{bucket}", api.ListObjectsHandler).Methods("GET") mux.HandleFunc("/{bucket}", a.HeadBucketHandler).Methods("HEAD")
mux.HandleFunc("/{bucket}", api.PutBucketHandler).Methods("PUT") mux.HandleFunc("/{bucket}/{object:.*}", a.HeadObjectHandler).Methods("HEAD")
mux.HandleFunc("/{bucket}", api.HeadBucketHandler).Methods("HEAD") mux.HandleFunc("/{bucket}/{object:.*}", a.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT")
mux.HandleFunc("/{bucket}/{object:.*}", api.HeadObjectHandler).Methods("HEAD") mux.HandleFunc("/{bucket}/{object:.*}", a.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT") mux.HandleFunc("/{bucket}/{object:.*}", a.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST")
mux.HandleFunc("/{bucket}/{object:.*}", api.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET") mux.HandleFunc("/{bucket}/{object:.*}", a.NewMultipartUploadHandler).Methods("POST")
mux.HandleFunc("/{bucket}/{object:.*}", api.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST") mux.HandleFunc("/{bucket}/{object:.*}", a.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("DELETE")
mux.HandleFunc("/{bucket}/{object:.*}", api.NewMultipartUploadHandler).Methods("POST") mux.HandleFunc("/{bucket}/{object:.*}", a.GetObjectHandler).Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("DELETE") mux.HandleFunc("/{bucket}/{object:.*}", a.PutObjectHandler).Methods("PUT")
mux.HandleFunc("/{bucket}/{object:.*}", api.GetObjectHandler).Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.PutObjectHandler).Methods("PUT")
// not implemented yet // not implemented yet
mux.HandleFunc("/{bucket}", api.DeleteBucketHandler).Methods("DELETE") mux.HandleFunc("/{bucket}", a.DeleteBucketHandler).Methods("DELETE")
// unsupported API // unsupported API
mux.HandleFunc("/{bucket}/{object:.*}", api.DeleteObjectHandler).Methods("DELETE") mux.HandleFunc("/{bucket}/{object:.*}", a.DeleteObjectHandler).Methods("DELETE")
return mux return mux
} }
@ -102,9 +105,12 @@ func registerRPC(mux *router.Router, s *rpc.Server) http.Handler {
} }
// getAPIHandler api handler // getAPIHandler api handler
func getAPIHandler(conf api.Config) http.Handler { func getAPIHandler(conf api.Config) (http.Handler, api.Minio) {
mux := router.NewRouter() mux := router.NewRouter()
return registerOtherMiddleware(registerAPI(mux), conf) minioAPI := getAPI()
apiHandler := registerAPI(mux, minioAPI)
apiHandler = registerOtherMiddleware(apiHandler, conf)
return apiHandler, minioAPI
} }
// getRPCHandler rpc handler // getRPCHandler rpc handler

@ -25,13 +25,13 @@ import (
"github.com/minio/minio/pkg/server/api" "github.com/minio/minio/pkg/server/api"
) )
func startAPI(errCh chan error, conf api.Config) { func startAPI(errCh chan error, conf api.Config, apiHandler http.Handler) {
defer close(errCh) defer close(errCh)
// Minio server config // Minio server config
httpServer := &http.Server{ httpServer := &http.Server{
Addr: conf.Address, Addr: conf.Address,
Handler: getAPIHandler(conf), Handler: apiHandler,
MaxHeaderBytes: 1 << 20, MaxHeaderBytes: 1 << 20,
} }
@ -74,20 +74,24 @@ func startAPI(errCh chan error, conf api.Config) {
} }
} }
func startRPC(errCh chan error) { func startRPC(errCh chan error, rpcHandler http.Handler) {
defer close(errCh) defer close(errCh)
// Minio server config // Minio server config
httpServer := &http.Server{ httpServer := &http.Server{
Addr: "127.0.0.1:9001", // TODO make this configurable Addr: "127.0.0.1:9001", // TODO make this configurable
Handler: getRPCHandler(), Handler: rpcHandler,
MaxHeaderBytes: 1 << 20, MaxHeaderBytes: 1 << 20,
} }
errCh <- httpServer.ListenAndServe() errCh <- httpServer.ListenAndServe()
} }
func startTM(errCh chan error) { func startTM(a api.Minio) {
defer close(errCh) for {
for op := range a.OP {
close(op.ProceedCh)
}
}
} }
// StartServices starts basic services for a server // StartServices starts basic services for a server
@ -95,8 +99,11 @@ func StartServices(conf api.Config) error {
apiErrCh := make(chan error) apiErrCh := make(chan error)
rpcErrCh := make(chan error) rpcErrCh := make(chan error)
go startAPI(apiErrCh, conf) apiHandler, minioAPI := getAPIHandler(conf)
go startRPC(rpcErrCh) go startAPI(apiErrCh, conf, apiHandler)
rpcHandler := getRPCHandler()
go startRPC(rpcErrCh, rpcHandler)
go startTM(minioAPI)
select { select {
case err := <-apiErrCh: case err := <-apiErrCh:

Loading…
Cancel
Save