parent
ad8010f2a4
commit
b8981ce946
@ -1,265 +0,0 @@ |
||||
/* |
||||
* Mini Object Storage, (C) 2014 Minio, Inc. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package server |
||||
|
||||
import ( |
||||
"bytes" |
||||
"errors" |
||||
"fmt" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"path" |
||||
|
||||
"github.com/gorilla/mux" |
||||
"github.com/minio-io/minio/pkg/storage/encodedstorage" |
||||
"github.com/tchap/go-patricia/patricia" |
||||
) |
||||
|
||||
// Stores system configuration, populated from CLI or test runner
|
||||
type GatewayConfig struct { |
||||
StorageDriver StorageDriver |
||||
BucketDriver BucketDriver |
||||
requestBucketChan chan BucketRequest |
||||
DataDir string |
||||
K, |
||||
M int |
||||
BlockSize uint64 |
||||
} |
||||
|
||||
// Message for requesting a bucket
|
||||
type BucketRequest struct { |
||||
name string |
||||
context Context |
||||
callback chan Bucket |
||||
} |
||||
|
||||
// Context interface for security and session information
|
||||
type Context interface{} |
||||
|
||||
// Bucket definition
|
||||
type Bucket interface { |
||||
GetName(Context) string |
||||
Get(Context, string) ([]byte, error) |
||||
Put(Context, string, []byte) error |
||||
} |
||||
|
||||
// Bucket driver function, should read from a channel and respond through callback channels
|
||||
type BucketDriver func(config GatewayConfig) |
||||
|
||||
// Storage driver function, should read from a channel and respond through callback channels
|
||||
type StorageDriver func(bucket string, input chan ObjectRequest, config GatewayConfig) |
||||
|
||||
// TODO remove when building real context
|
||||
type fakeContext struct{} |
||||
|
||||
type GatewayGetHandler struct { |
||||
config GatewayConfig |
||||
} |
||||
|
||||
// GET requests server
|
||||
func (handler GatewayGetHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { |
||||
vars := mux.Vars(req) |
||||
bucketName := vars["bucket"] |
||||
path := vars["path"] |
||||
context := fakeContext{} |
||||
callback := make(chan Bucket) |
||||
handler.config.requestBucketChan <- BucketRequest{ |
||||
name: bucketName, |
||||
context: context, |
||||
callback: callback, |
||||
} |
||||
bucket := <-callback |
||||
object, err := bucket.Get(context, string(path)) |
||||
if err != nil { |
||||
http.Error(w, err.Error(), 404) |
||||
} else if object == nil { |
||||
http.Error(w, errors.New("Object not found").Error(), 404) |
||||
} else { |
||||
fmt.Fprintf(w, string(object)) |
||||
} |
||||
} |
||||
|
||||
type GatewayPutHandler struct { |
||||
config GatewayConfig |
||||
} |
||||
|
||||
func (handler GatewayPutHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { |
||||
vars := mux.Vars(req) |
||||
bucketName := vars["bucket"] |
||||
path := vars["path"] |
||||
object, _ := ioutil.ReadAll(req.Body) |
||||
context := fakeContext{} |
||||
callback := make(chan Bucket) |
||||
handler.config.requestBucketChan <- BucketRequest{ |
||||
name: bucketName, |
||||
context: context, |
||||
callback: callback, |
||||
} |
||||
bucket := <-callback |
||||
bucket.Put(context, path, object) |
||||
} |
||||
|
||||
func RegisterGatewayHandlers(router *mux.Router, config GatewayConfig) { |
||||
config.requestBucketChan = make(chan BucketRequest) |
||||
go config.BucketDriver(config) |
||||
getHandler := GatewayGetHandler{config} |
||||
putHandler := GatewayPutHandler{config} |
||||
router.Handle("/{bucket}/{path:.*}", getHandler).Methods("GET") |
||||
router.Handle("/{bucket}/{path:.*}", putHandler).Methods("PUT") |
||||
} |
||||
|
||||
func SynchronizedBucketDriver(config GatewayConfig) { |
||||
buckets := make(map[string]*SynchronizedBucket) |
||||
for request := range config.requestBucketChan { |
||||
if buckets[request.name] == nil { |
||||
bucketChannel := make(chan ObjectRequest) |
||||
go config.StorageDriver(request.name, bucketChannel, config) |
||||
buckets[request.name] = &SynchronizedBucket{ |
||||
name: request.name, |
||||
channel: bucketChannel, |
||||
} |
||||
} |
||||
request.callback <- buckets[request.name] |
||||
} |
||||
for key := range buckets { |
||||
buckets[key].closeChannel() |
||||
} |
||||
} |
||||
|
||||
type SynchronizedBucket struct { |
||||
name string |
||||
channel chan ObjectRequest |
||||
objects map[string][]byte |
||||
} |
||||
|
||||
type ObjectRequest struct { |
||||
requestType string |
||||
path string |
||||
object []byte |
||||
callback chan interface{} |
||||
} |
||||
|
||||
func (bucket SynchronizedBucket) GetName(context Context) string { |
||||
return bucket.name |
||||
} |
||||
|
||||
func (bucket SynchronizedBucket) Get(context Context, path string) ([]byte, error) { |
||||
callback := make(chan interface{}) |
||||
bucket.channel <- ObjectRequest{ |
||||
requestType: "GET", |
||||
path: path, |
||||
callback: callback, |
||||
} |
||||
response := <-callback |
||||
|
||||
switch response.(type) { |
||||
case error: |
||||
return nil, response.(error) |
||||
case nil: |
||||
return nil, errors.New("Object not found") |
||||
case interface{}: |
||||
return response.([]byte), nil |
||||
default: |
||||
return nil, errors.New("Unexpected error, service failed") |
||||
} |
||||
} |
||||
|
||||
func (bucket SynchronizedBucket) Put(context Context, path string, object []byte) error { |
||||
callback := make(chan interface{}) |
||||
bucket.channel <- ObjectRequest{ |
||||
requestType: "PUT", |
||||
path: path, |
||||
object: object, |
||||
callback: callback, |
||||
} |
||||
switch response := <-callback; response.(type) { |
||||
case error: |
||||
return response.(error) |
||||
case nil: |
||||
return nil |
||||
default: |
||||
return errors.New("Unexpected error, service failed") |
||||
} |
||||
} |
||||
|
||||
func (bucket *SynchronizedBucket) closeChannel() { |
||||
close(bucket.channel) |
||||
} |
||||
|
||||
func InMemoryStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) { |
||||
objects := patricia.NewTrie() |
||||
for request := range input { |
||||
prefix := patricia.Prefix(request.path) |
||||
switch request.requestType { |
||||
case "GET": |
||||
request.callback <- objects.Get(prefix) |
||||
case "PUT": |
||||
objects.Insert(prefix, request.object) |
||||
request.callback <- nil |
||||
default: |
||||
request.callback <- errors.New("Unexpected message") |
||||
} |
||||
} |
||||
} |
||||
|
||||
func SimpleEncodedStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) { |
||||
eStorage, _ := encodedstorage.NewStorage(config.DataDir, config.K, config.M, config.BlockSize) |
||||
for request := range input { |
||||
switch request.requestType { |
||||
case "GET": |
||||
objectPath := path.Join(bucket, request.path) |
||||
object, err := eStorage.Get(objectPath) |
||||
if err != nil { |
||||
request.callback <- err |
||||
} else { |
||||
request.callback <- object |
||||
} |
||||
case "PUT": |
||||
objectPath := path.Join(bucket, request.path) |
||||
err := eStorage.Put(objectPath, bytes.NewBuffer(request.object)) |
||||
if err != nil { |
||||
request.callback <- err |
||||
} else { |
||||
request.callback <- nil |
||||
} |
||||
default: |
||||
request.callback <- errors.New("Unexpected message") |
||||
} |
||||
} |
||||
} |
||||
|
||||
//func SimpleFileStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) {
|
||||
// fileStorage, _ := fsstorage.NewStorage(config.DataDir, config.BlockSize)
|
||||
// for request := range input {
|
||||
// switch request.requestType {
|
||||
// case "GET":
|
||||
// objectPath := path.Join(bucket, request.path)
|
||||
// object, err := fileStorage.Get(objectPath)
|
||||
// if err != nil {
|
||||
// request.callback <- nil
|
||||
// } else {
|
||||
// request.callback <- object
|
||||
// }
|
||||
// case "PUT":
|
||||
// objectPath := path.Join(bucket, request.path)
|
||||
// fileStorage.Put(objectPath, bytes.NewBuffer(request.object))
|
||||
// request.callback <- nil
|
||||
// default:
|
||||
// request.callback <- errors.New("Unexpected message")
|
||||
// }
|
||||
// }
|
||||
//}
|
@ -1,158 +0,0 @@ |
||||
package server |
||||
|
||||
import ( |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"os" |
||||
"strings" |
||||
|
||||
"github.com/gorilla/mux" |
||||
"github.com/minio-io/minio/pkg/utils" |
||||
. "gopkg.in/check.v1" |
||||
) |
||||
|
||||
type GatewaySuite struct{} |
||||
|
||||
var _ = Suite(&GatewaySuite{}) |
||||
|
||||
func (s *GatewaySuite) TestPrintsGateway(c *C) { |
||||
// set up router with in memory storage driver
|
||||
router := mux.NewRouter() |
||||
config := GatewayConfig{ |
||||
StorageDriver: InMemoryStorageDriver, |
||||
BucketDriver: SynchronizedBucketDriver, |
||||
} |
||||
RegisterGatewayHandlers(router, config) |
||||
server := httptest.NewServer(router) |
||||
defer server.Close() |
||||
|
||||
// GET request, empty
|
||||
getReq1, _ := http.NewRequest("GET", server.URL+"/one/two/three", nil) |
||||
client := &http.Client{} |
||||
resp, err := client.Do(getReq1) |
||||
c.Assert(resp.StatusCode, Equals, 404) |
||||
c.Assert(err, IsNil) |
||||
|
||||
// assert object not found response
|
||||
body, _ := ioutil.ReadAll(resp.Body) |
||||
c.Assert(string(body), Equals, "Object not found\n") |
||||
c.Assert(err, IsNil) |
||||
|
||||
// add new object
|
||||
putReq, _ := http.NewRequest("PUT", server.URL+"/one/two/three", strings.NewReader("hello")) |
||||
resp, err = client.Do(putReq) |
||||
c.Assert(resp.StatusCode, Equals, 200) |
||||
c.Assert(err, IsNil) |
||||
|
||||
// verify object exists
|
||||
getReq2, _ := http.NewRequest("GET", server.URL+"/one/two/three", strings.NewReader("hello")) |
||||
resp, err = client.Do(getReq2) |
||||
c.Assert(resp.StatusCode, Equals, 200) |
||||
c.Assert(err, IsNil) |
||||
|
||||
// verify object's contents
|
||||
body2, _ := ioutil.ReadAll(resp.Body) |
||||
c.Assert(string(body2), Equals, "hello") |
||||
c.Assert(err, IsNil) |
||||
} |
||||
|
||||
type TestContext struct{} |
||||
|
||||
func (s *GatewaySuite) TestBucketCreation(c *C) { |
||||
config := GatewayConfig{ |
||||
StorageDriver: InMemoryStorageDriver, |
||||
requestBucketChan: make(chan BucketRequest), |
||||
} |
||||
defer close(config.requestBucketChan) |
||||
go SynchronizedBucketDriver(config) |
||||
context := TestContext{} |
||||
|
||||
// get new bucket A
|
||||
var bucketA1 Bucket |
||||
callback := make(chan Bucket) |
||||
config.requestBucketChan <- BucketRequest{ |
||||
name: "bucketA", |
||||
context: context, |
||||
callback: callback, |
||||
} |
||||
bucketA1 = <-callback |
||||
c.Assert(bucketA1.GetName(context), Equals, "bucketA") |
||||
|
||||
// get bucket A again
|
||||
var bucketA2 Bucket |
||||
callback = make(chan Bucket) |
||||
config.requestBucketChan <- BucketRequest{ |
||||
name: "bucketA", |
||||
context: context, |
||||
callback: callback, |
||||
} |
||||
bucketA2 = <-callback |
||||
c.Assert(bucketA2.GetName(context), Equals, "bucketA") |
||||
c.Assert(bucketA1, DeepEquals, bucketA2) |
||||
|
||||
// get new bucket B
|
||||
var bucketB Bucket |
||||
callback = make(chan Bucket) |
||||
config.requestBucketChan <- BucketRequest{ |
||||
name: "bucketB", |
||||
context: context, |
||||
callback: callback, |
||||
} |
||||
bucketB = <-callback |
||||
c.Assert(bucketB.GetName(context), Equals, "bucketB") |
||||
} |
||||
|
||||
func (s *GatewaySuite) TestInMemoryBucketOperations(c *C) { |
||||
simpleFileStorageRootDir, err1 := utils.MakeTempTestDir() |
||||
c.Assert(err1, IsNil) |
||||
simpleEncodedStorageRootDir, err2 := utils.MakeTempTestDir() |
||||
c.Assert(err2, IsNil) |
||||
defer os.RemoveAll(simpleFileStorageRootDir) |
||||
defer os.RemoveAll(simpleEncodedStorageRootDir) |
||||
|
||||
configs := []GatewayConfig{ |
||||
GatewayConfig{ |
||||
StorageDriver: InMemoryStorageDriver, |
||||
requestBucketChan: make(chan BucketRequest), |
||||
}, |
||||
GatewayConfig{ |
||||
StorageDriver: SimpleEncodedStorageDriver, |
||||
requestBucketChan: make(chan BucketRequest), |
||||
DataDir: simpleEncodedStorageRootDir, |
||||
K: 10, |
||||
M: 6, |
||||
BlockSize: 1024 * 1024, |
||||
}, |
||||
} |
||||
for _, config := range configs { |
||||
defer close(config.requestBucketChan) |
||||
go SynchronizedBucketDriver(config) |
||||
context := TestContext{} |
||||
|
||||
// get bucket
|
||||
callback := make(chan Bucket) |
||||
config.requestBucketChan <- BucketRequest{ |
||||
name: "bucket", |
||||
context: context, |
||||
callback: callback, |
||||
} |
||||
bucket := <-callback |
||||
c.Assert(bucket.GetName(context), Equals, "bucket") |
||||
|
||||
// get missing value
|
||||
nilResult, err := bucket.Get(context, "foo") |
||||
c.Assert(nilResult, IsNil) |
||||
c.Assert(err, Not(IsNil)) |
||||
c.Assert(err.Error(), Equals, "Object not found") |
||||
|
||||
// add new value
|
||||
err = bucket.Put(context, "foo", []byte("bar")) |
||||
c.Assert(err, IsNil) |
||||
|
||||
// retrieve value
|
||||
barResult, err := bucket.Get(context, "foo") |
||||
c.Assert(err, IsNil) |
||||
c.Assert(string(barResult), Equals, "bar") |
||||
} |
||||
} |
Loading…
Reference in new issue