|
|
|
@ -12,6 +12,7 @@ import ( |
|
|
|
|
// Stores system configuration, populated from CLI or test runner
|
|
|
|
|
type GatewayConfig struct { |
|
|
|
|
StorageDriver StorageDriver |
|
|
|
|
requestBucketChan chan BucketRequest |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Message for requesting a bucket
|
|
|
|
@ -38,7 +39,7 @@ type StorageDriver func(bucket string, input chan ObjectRequest) |
|
|
|
|
type fakeContext struct{} |
|
|
|
|
|
|
|
|
|
type GatewayGetHandler struct { |
|
|
|
|
requestBucketChan chan BucketRequest |
|
|
|
|
config GatewayConfig |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GET requests server
|
|
|
|
@ -48,7 +49,7 @@ func (handler GatewayGetHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ |
|
|
|
|
path := vars["path"] |
|
|
|
|
context := fakeContext{} |
|
|
|
|
callback := make(chan Bucket) |
|
|
|
|
handler.requestBucketChan <- BucketRequest{ |
|
|
|
|
handler.config.requestBucketChan <- BucketRequest{ |
|
|
|
|
name: bucketName, |
|
|
|
|
context: context, |
|
|
|
|
callback: callback, |
|
|
|
@ -65,7 +66,7 @@ func (handler GatewayGetHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type GatewayPutHandler struct { |
|
|
|
|
requestBucketChan chan BucketRequest |
|
|
|
|
config GatewayConfig |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (handler GatewayPutHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { |
|
|
|
@ -75,7 +76,7 @@ func (handler GatewayPutHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ |
|
|
|
|
object, _ := ioutil.ReadAll(req.Body) |
|
|
|
|
context := fakeContext{} |
|
|
|
|
callback := make(chan Bucket) |
|
|
|
|
handler.requestBucketChan <- BucketRequest{ |
|
|
|
|
handler.config.requestBucketChan <- BucketRequest{ |
|
|
|
|
name: bucketName, |
|
|
|
|
context: context, |
|
|
|
|
callback: callback, |
|
|
|
@ -85,17 +86,17 @@ func (handler GatewayPutHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func RegisterGatewayHandlers(router *mux.Router, config GatewayConfig) { |
|
|
|
|
requestBucketChan := make(chan BucketRequest) |
|
|
|
|
go SynchronizedBucketService(requestBucketChan, config) |
|
|
|
|
getHandler := GatewayGetHandler{requestBucketChan: requestBucketChan} |
|
|
|
|
putHandler := GatewayPutHandler{requestBucketChan: requestBucketChan} |
|
|
|
|
config.requestBucketChan = make(chan BucketRequest) |
|
|
|
|
go SynchronizedBucketService(config) |
|
|
|
|
getHandler := GatewayGetHandler{config} |
|
|
|
|
putHandler := GatewayPutHandler{config} |
|
|
|
|
router.Handle("/{bucket}/{path:.*}", getHandler).Methods("GET") |
|
|
|
|
router.Handle("/{bucket}/{path:.*}", putHandler).Methods("PUT") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func SynchronizedBucketService(input chan BucketRequest, config GatewayConfig) { |
|
|
|
|
func SynchronizedBucketService(config GatewayConfig) { |
|
|
|
|
buckets := make(map[string]*SynchronizedBucket) |
|
|
|
|
for request := range input { |
|
|
|
|
for request := range config.requestBucketChan { |
|
|
|
|
if buckets[request.name] == nil { |
|
|
|
|
bucketChannel := make(chan ObjectRequest) |
|
|
|
|
go config.StorageDriver(request.name, bucketChannel) |
|
|
|
|