diff --git a/pkg/server/server.go b/pkg/server/server.go deleted file mode 100644 index 003b84722..000000000 --- a/pkg/server/server.go +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Mini Object Storage, (C) 2014 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package server - -import ( - "bytes" - "errors" - "fmt" - "io/ioutil" - "net/http" - "path" - - "github.com/gorilla/mux" - "github.com/minio-io/minio/pkg/storage/encodedstorage" - "github.com/tchap/go-patricia/patricia" -) - -// Stores system configuration, populated from CLI or test runner -type GatewayConfig struct { - StorageDriver StorageDriver - BucketDriver BucketDriver - requestBucketChan chan BucketRequest - DataDir string - K, - M int - BlockSize uint64 -} - -// Message for requesting a bucket -type BucketRequest struct { - name string - context Context - callback chan Bucket -} - -// Context interface for security and session information -type Context interface{} - -// Bucket definition -type Bucket interface { - GetName(Context) string - Get(Context, string) ([]byte, error) - Put(Context, string, []byte) error -} - -// Bucket driver function, should read from a channel and respond through callback channels -type BucketDriver func(config GatewayConfig) - -// Storage driver function, should read from a channel and respond through callback channels -type StorageDriver func(bucket string, input chan ObjectRequest, config GatewayConfig) - -// TODO remove when building real context -type fakeContext struct{} - -type GatewayGetHandler struct { - config GatewayConfig -} - -// GET requests server -func (handler GatewayGetHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - bucketName := vars["bucket"] - path := vars["path"] - context := fakeContext{} - callback := make(chan Bucket) - handler.config.requestBucketChan <- BucketRequest{ - name: bucketName, - context: context, - callback: callback, - } - bucket := <-callback - object, err := bucket.Get(context, string(path)) - if err != nil { - http.Error(w, err.Error(), 404) - } else if object == nil { - http.Error(w, errors.New("Object not found").Error(), 404) - } else { - fmt.Fprintf(w, string(object)) - } -} - -type GatewayPutHandler struct { - config GatewayConfig -} - -func (handler GatewayPutHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - bucketName := vars["bucket"] - path := vars["path"] - object, _ := ioutil.ReadAll(req.Body) - context := fakeContext{} - callback := make(chan Bucket) - handler.config.requestBucketChan <- BucketRequest{ - name: bucketName, - context: context, - callback: callback, - } - bucket := <-callback - bucket.Put(context, path, object) -} - -func RegisterGatewayHandlers(router *mux.Router, config GatewayConfig) { - config.requestBucketChan = make(chan BucketRequest) - go config.BucketDriver(config) - getHandler := GatewayGetHandler{config} - putHandler := GatewayPutHandler{config} - router.Handle("/{bucket}/{path:.*}", getHandler).Methods("GET") - router.Handle("/{bucket}/{path:.*}", putHandler).Methods("PUT") -} - -func SynchronizedBucketDriver(config GatewayConfig) { - buckets := make(map[string]*SynchronizedBucket) - for request := range config.requestBucketChan { - if buckets[request.name] == nil { - bucketChannel := make(chan ObjectRequest) - go config.StorageDriver(request.name, bucketChannel, config) - buckets[request.name] = &SynchronizedBucket{ - name: request.name, - channel: bucketChannel, - } - } - request.callback <- buckets[request.name] - } - for key := range buckets { - buckets[key].closeChannel() - } -} - -type SynchronizedBucket struct { - name string - channel chan ObjectRequest - objects map[string][]byte -} - -type ObjectRequest struct { - requestType string - path string - object []byte - callback chan interface{} -} - -func (bucket SynchronizedBucket) GetName(context Context) string { - return bucket.name -} - -func (bucket SynchronizedBucket) Get(context Context, path string) ([]byte, error) { - callback := make(chan interface{}) - bucket.channel <- ObjectRequest{ - requestType: "GET", - path: path, - callback: callback, - } - response := <-callback - - switch response.(type) { - case error: - return nil, response.(error) - case nil: - return nil, errors.New("Object not found") - case interface{}: - return response.([]byte), nil - default: - return nil, errors.New("Unexpected error, service failed") - } -} - -func (bucket SynchronizedBucket) Put(context Context, path string, object []byte) error { - callback := make(chan interface{}) - bucket.channel <- ObjectRequest{ - requestType: "PUT", - path: path, - object: object, - callback: callback, - } - switch response := <-callback; response.(type) { - case error: - return response.(error) - case nil: - return nil - default: - return errors.New("Unexpected error, service failed") - } -} - -func (bucket *SynchronizedBucket) closeChannel() { - close(bucket.channel) -} - -func InMemoryStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) { - objects := patricia.NewTrie() - for request := range input { - prefix := patricia.Prefix(request.path) - switch request.requestType { - case "GET": - request.callback <- objects.Get(prefix) - case "PUT": - objects.Insert(prefix, request.object) - request.callback <- nil - default: - request.callback <- errors.New("Unexpected message") - } - } -} - -func SimpleEncodedStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) { - eStorage, _ := encodedstorage.NewStorage(config.DataDir, config.K, config.M, config.BlockSize) - for request := range input { - switch request.requestType { - case "GET": - objectPath := path.Join(bucket, request.path) - object, err := eStorage.Get(objectPath) - if err != nil { - request.callback <- err - } else { - request.callback <- object - } - case "PUT": - objectPath := path.Join(bucket, request.path) - err := eStorage.Put(objectPath, bytes.NewBuffer(request.object)) - if err != nil { - request.callback <- err - } else { - request.callback <- nil - } - default: - request.callback <- errors.New("Unexpected message") - } - } -} - -//func SimpleFileStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) { -// fileStorage, _ := fsstorage.NewStorage(config.DataDir, config.BlockSize) -// for request := range input { -// switch request.requestType { -// case "GET": -// objectPath := path.Join(bucket, request.path) -// object, err := fileStorage.Get(objectPath) -// if err != nil { -// request.callback <- nil -// } else { -// request.callback <- object -// } -// case "PUT": -// objectPath := path.Join(bucket, request.path) -// fileStorage.Put(objectPath, bytes.NewBuffer(request.object)) -// request.callback <- nil -// default: -// request.callback <- errors.New("Unexpected message") -// } -// } -//} diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go deleted file mode 100644 index 8433ed9a3..000000000 --- a/pkg/server/server_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package server - -import ( - "io/ioutil" - "net/http" - "net/http/httptest" - "os" - "strings" - - "github.com/gorilla/mux" - "github.com/minio-io/minio/pkg/utils" - . "gopkg.in/check.v1" -) - -type GatewaySuite struct{} - -var _ = Suite(&GatewaySuite{}) - -func (s *GatewaySuite) TestPrintsGateway(c *C) { - // set up router with in memory storage driver - router := mux.NewRouter() - config := GatewayConfig{ - StorageDriver: InMemoryStorageDriver, - BucketDriver: SynchronizedBucketDriver, - } - RegisterGatewayHandlers(router, config) - server := httptest.NewServer(router) - defer server.Close() - - // GET request, empty - getReq1, _ := http.NewRequest("GET", server.URL+"/one/two/three", nil) - client := &http.Client{} - resp, err := client.Do(getReq1) - c.Assert(resp.StatusCode, Equals, 404) - c.Assert(err, IsNil) - - // assert object not found response - body, _ := ioutil.ReadAll(resp.Body) - c.Assert(string(body), Equals, "Object not found\n") - c.Assert(err, IsNil) - - // add new object - putReq, _ := http.NewRequest("PUT", server.URL+"/one/two/three", strings.NewReader("hello")) - resp, err = client.Do(putReq) - c.Assert(resp.StatusCode, Equals, 200) - c.Assert(err, IsNil) - - // verify object exists - getReq2, _ := http.NewRequest("GET", server.URL+"/one/two/three", strings.NewReader("hello")) - resp, err = client.Do(getReq2) - c.Assert(resp.StatusCode, Equals, 200) - c.Assert(err, IsNil) - - // verify object's contents - body2, _ := ioutil.ReadAll(resp.Body) - c.Assert(string(body2), Equals, "hello") - c.Assert(err, IsNil) -} - -type TestContext struct{} - -func (s *GatewaySuite) TestBucketCreation(c *C) { - config := GatewayConfig{ - StorageDriver: InMemoryStorageDriver, - requestBucketChan: make(chan BucketRequest), - } - defer close(config.requestBucketChan) - go SynchronizedBucketDriver(config) - context := TestContext{} - - // get new bucket A - var bucketA1 Bucket - callback := make(chan Bucket) - config.requestBucketChan <- BucketRequest{ - name: "bucketA", - context: context, - callback: callback, - } - bucketA1 = <-callback - c.Assert(bucketA1.GetName(context), Equals, "bucketA") - - // get bucket A again - var bucketA2 Bucket - callback = make(chan Bucket) - config.requestBucketChan <- BucketRequest{ - name: "bucketA", - context: context, - callback: callback, - } - bucketA2 = <-callback - c.Assert(bucketA2.GetName(context), Equals, "bucketA") - c.Assert(bucketA1, DeepEquals, bucketA2) - - // get new bucket B - var bucketB Bucket - callback = make(chan Bucket) - config.requestBucketChan <- BucketRequest{ - name: "bucketB", - context: context, - callback: callback, - } - bucketB = <-callback - c.Assert(bucketB.GetName(context), Equals, "bucketB") -} - -func (s *GatewaySuite) TestInMemoryBucketOperations(c *C) { - simpleFileStorageRootDir, err1 := utils.MakeTempTestDir() - c.Assert(err1, IsNil) - simpleEncodedStorageRootDir, err2 := utils.MakeTempTestDir() - c.Assert(err2, IsNil) - defer os.RemoveAll(simpleFileStorageRootDir) - defer os.RemoveAll(simpleEncodedStorageRootDir) - - configs := []GatewayConfig{ - GatewayConfig{ - StorageDriver: InMemoryStorageDriver, - requestBucketChan: make(chan BucketRequest), - }, - GatewayConfig{ - StorageDriver: SimpleEncodedStorageDriver, - requestBucketChan: make(chan BucketRequest), - DataDir: simpleEncodedStorageRootDir, - K: 10, - M: 6, - BlockSize: 1024 * 1024, - }, - } - for _, config := range configs { - defer close(config.requestBucketChan) - go SynchronizedBucketDriver(config) - context := TestContext{} - - // get bucket - callback := make(chan Bucket) - config.requestBucketChan <- BucketRequest{ - name: "bucket", - context: context, - callback: callback, - } - bucket := <-callback - c.Assert(bucket.GetName(context), Equals, "bucket") - - // get missing value - nilResult, err := bucket.Get(context, "foo") - c.Assert(nilResult, IsNil) - c.Assert(err, Not(IsNil)) - c.Assert(err.Error(), Equals, "Object not found") - - // add new value - err = bucket.Put(context, "foo", []byte("bar")) - c.Assert(err, IsNil) - - // retrieve value - barResult, err := bucket.Get(context, "foo") - c.Assert(err, IsNil) - c.Assert(string(barResult), Equals, "bar") - } -}