Merge pull request #26 from fkautz/pr_out_adding_simple_file_storage_driver_for_persistent_storage

Adding simple file storage driver for persistent storage
master
Harshavardhana 10 years ago
commit d476a9d87e
  1. 22
      file_storage.go
  2. 51
      file_storage_test.go
  3. 35
      gateway.go
  4. 86
      gateway_test.go
  5. 8
      setup_test.go
  6. 5
      storage.go

@ -0,0 +1,22 @@
package minio
import (
"io/ioutil"
"os"
"path"
"path/filepath"
)
type FileStorage struct {
RootDir string
}
func (storage FileStorage) Get(objectPath string) ([]byte, error) {
return ioutil.ReadFile(path.Join(storage.RootDir, objectPath))
}
func (storage FileStorage) Put(objectPath string, object []byte) error {
os.MkdirAll(filepath.Dir(path.Join(storage.RootDir, objectPath)), 0700)
return ioutil.WriteFile(path.Join(storage.RootDir, objectPath), object, 0600)
}

@ -0,0 +1,51 @@
package minio
import (
. "gopkg.in/check.v1"
"io/ioutil"
"os"
)
type FileStorageSuite struct{}
var _ = Suite(&FileStorageSuite{})
func makeTempTestDir() (string, error) {
return ioutil.TempDir("/tmp", "minio-test-")
}
func (s *FileStorageSuite) TestFileStoragePutAtRootPath(c *C) {
rootDir, err := makeTempTestDir()
c.Assert(err, IsNil)
defer os.RemoveAll(rootDir)
var storage ObjectStorage
storage = FileStorage{
RootDir: rootDir,
}
storage.Put("path1", []byte("object1"))
// assert object1 was created in correct path
object1, err := storage.Get("path1")
c.Assert(err, IsNil)
c.Assert(string(object1), Equals, "object1")
}
func (s *FileStorageSuite) TestFileStoragePutDirPath(c *C) {
rootDir, err := makeTempTestDir()
c.Assert(err, IsNil)
defer os.RemoveAll(rootDir)
var storage ObjectStorage
storage = FileStorage{
RootDir: rootDir,
}
storage.Put("path1/path2/path3", []byte("object"))
// assert object1 was created in correct path
object1, err := storage.Get("path1/path2/path3")
c.Assert(err, IsNil)
c.Assert(string(object1), Equals, "object")
}

@ -7,6 +7,7 @@ import (
"github.com/tchap/go-patricia/patricia" "github.com/tchap/go-patricia/patricia"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path"
) )
// Stores system configuration, populated from CLI or test runner // Stores system configuration, populated from CLI or test runner
@ -14,6 +15,7 @@ type GatewayConfig struct {
StorageDriver StorageDriver StorageDriver StorageDriver
BucketDriver BucketDriver BucketDriver BucketDriver
requestBucketChan chan BucketRequest requestBucketChan chan BucketRequest
dataDir string
} }
// Message for requesting a bucket // Message for requesting a bucket
@ -37,7 +39,7 @@ type Bucket interface {
type BucketDriver func(config GatewayConfig) type BucketDriver func(config GatewayConfig)
// Storage driver function, should read from a channel and respond through callback channels // Storage driver function, should read from a channel and respond through callback channels
type StorageDriver func(bucket string, input chan ObjectRequest) type StorageDriver func(bucket string, input chan ObjectRequest, config GatewayConfig)
// TODO remove when building real context // TODO remove when building real context
type fakeContext struct{} type fakeContext struct{}
@ -103,7 +105,7 @@ func SynchronizedBucketDriver(config GatewayConfig) {
for request := range config.requestBucketChan { for request := range config.requestBucketChan {
if buckets[request.name] == nil { if buckets[request.name] == nil {
bucketChannel := make(chan ObjectRequest) bucketChannel := make(chan ObjectRequest)
go config.StorageDriver(request.name, bucketChannel) go config.StorageDriver(request.name, bucketChannel, config)
buckets[request.name] = &SynchronizedBucket{ buckets[request.name] = &SynchronizedBucket{
name: request.name, name: request.name,
channel: bucketChannel, channel: bucketChannel,
@ -176,17 +178,14 @@ func (bucket *SynchronizedBucket) closeChannel() {
close(bucket.channel) close(bucket.channel)
} }
func InMemoryStorageDriver(bucket string, input chan ObjectRequest) { func InMemoryStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) {
objects := patricia.NewTrie() objects := patricia.NewTrie()
for request := range input { for request := range input {
prefix := patricia.Prefix(request.path) prefix := patricia.Prefix(request.path)
fmt.Println("objects:", objects)
switch request.requestType { switch request.requestType {
case "GET": case "GET":
fmt.Println("GET: " + request.path)
request.callback <- objects.Get(prefix) request.callback <- objects.Get(prefix)
case "PUT": case "PUT":
fmt.Println("PUT: " + request.path)
objects.Insert(prefix, request.object) objects.Insert(prefix, request.object)
request.callback <- nil request.callback <- nil
default: default:
@ -194,3 +193,27 @@ func InMemoryStorageDriver(bucket string, input chan ObjectRequest) {
} }
} }
} }
func SimpleFileStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) {
storage := FileStorage{
RootDir: config.dataDir,
}
for request := range input {
switch request.requestType {
case "GET":
objectPath := path.Join(bucket, request.path)
object, err := storage.Get(objectPath)
if err != nil {
request.callback <- nil
} else {
request.callback <- object
}
case "PUT":
objectPath := path.Join(bucket, request.path)
storage.Put(objectPath, request.object)
request.callback <- nil
default:
request.callback <- errors.New("Unexpected message")
}
}
}

@ -6,17 +6,15 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"strings" "strings"
"testing"
) )
type MySuite struct{} type GatewaySuite struct{}
var _ = Suite(&MySuite{}) var _ = Suite(&GatewaySuite{})
func Test(t *testing.T) { TestingT(t) } func (s *GatewaySuite) TestPrintsGateway(c *C) {
func (s *MySuite) TestPrintsGateway(c *C) {
// set up router with in memory storage driver // set up router with in memory storage driver
router := mux.NewRouter() router := mux.NewRouter()
config := GatewayConfig{ config := GatewayConfig{
@ -59,7 +57,7 @@ func (s *MySuite) TestPrintsGateway(c *C) {
type TestContext struct{} type TestContext struct{}
func (s *MySuite) TestBucketCreation(c *C) { func (s *GatewaySuite) TestBucketCreation(c *C) {
config := GatewayConfig{ config := GatewayConfig{
StorageDriver: InMemoryStorageDriver, StorageDriver: InMemoryStorageDriver,
requestBucketChan: make(chan BucketRequest), requestBucketChan: make(chan BucketRequest),
@ -103,38 +101,50 @@ func (s *MySuite) TestBucketCreation(c *C) {
c.Assert(bucketB.GetName(context), Equals, "bucketB") c.Assert(bucketB.GetName(context), Equals, "bucketB")
} }
func (s *MySuite) TestInMemoryBucketOperations(c *C) { func (s *GatewaySuite) TestInMemoryBucketOperations(c *C) {
// Test in memory bucket operations simpleFileStorageRootDir, err := makeTempTestDir()
config := GatewayConfig{ c.Assert(err, IsNil)
StorageDriver: InMemoryStorageDriver, defer os.RemoveAll(simpleFileStorageRootDir)
requestBucketChan: make(chan BucketRequest), configs := []GatewayConfig{
GatewayConfig{
StorageDriver: InMemoryStorageDriver,
requestBucketChan: make(chan BucketRequest),
},
GatewayConfig{
StorageDriver: SimpleFileStorageDriver,
requestBucketChan: make(chan BucketRequest),
dataDir: simpleFileStorageRootDir,
},
} }
defer close(config.requestBucketChan) for _, config := range configs {
go SynchronizedBucketDriver(config) defer close(config.requestBucketChan)
context := TestContext{} go SynchronizedBucketDriver(config)
context := TestContext{}
// get bucket
callback := make(chan Bucket)
config.requestBucketChan <- BucketRequest{
name: "bucket",
context: context,
callback: callback,
}
bucket := <-callback
c.Assert(bucket.GetName(context), Equals, "bucket")
// get missing value
nilResult, err := bucket.Get(context, "foo")
c.Assert(nilResult, IsNil)
c.Assert(err, Not(IsNil))
c.Assert(err.Error(), Equals, "Object not found")
// add new value
err = bucket.Put(context, "foo", []byte("bar"))
c.Assert(err, IsNil)
// retrieve value
barResult, err := bucket.Get(context, "foo")
c.Assert(err, IsNil)
c.Assert(string(barResult), Equals, "bar")
// get bucket
callback := make(chan Bucket)
config.requestBucketChan <- BucketRequest{
name: "bucket",
context: context,
callback: callback,
} }
bucket := <-callback
c.Assert(bucket.GetName(context), Equals, "bucket")
// get missing value
nilResult, err := bucket.Get(context, "foo")
c.Assert(nilResult, IsNil)
c.Assert(err, Not(IsNil))
c.Assert(err.Error(), Equals, "Object not found")
// add new value
err = bucket.Put(context, "foo", []byte("bar"))
c.Assert(err, IsNil)
// retrieve value
barResult, err := bucket.Get(context, "foo")
c.Assert(err, IsNil)
c.Assert(string(barResult), Equals, "bar")
} }

@ -0,0 +1,8 @@
package minio
import (
. "gopkg.in/check.v1"
"testing"
)
func Test(t *testing.T) { TestingT(t) }

@ -6,6 +6,11 @@ import (
"net/http" "net/http"
) )
type ObjectStorage interface {
Get(path string) ([]byte, error)
Put(path string, object []byte) error
}
func RegisterStorageHandlers(router *mux.Router) { func RegisterStorageHandlers(router *mux.Router) {
router.HandleFunc("/storage/rpc", StorageHandler) router.HandleFunc("/storage/rpc", StorageHandler)
} }

Loading…
Cancel
Save