Adding fs storage

master
Frederick F. Kautz IV 10 years ago
parent 3461e7e300
commit 163a6c35db
  1. 201
      pkg/storage/fs/fs.go
  2. 42
      pkg/storage/fs/fs_test.go
  3. 29
      pkg/storage/storage_api_suite.go

@ -0,0 +1,201 @@
package fs
import (
"errors"
"io"
"io/ioutil"
"os"
"path"
"strings"
"sync"
mstorage "github.com/minio-io/minio/pkg/storage"
)
type storage struct {
root string
writeLock sync.Mutex
}
type MkdirFailedError struct{}
func (self MkdirFailedError) Error() string {
return "Mkdir Failed"
}
func Start(root string) (chan<- string, <-chan error, *storage) {
ctrlChannel := make(chan string)
errorChannel := make(chan error)
go start(ctrlChannel, errorChannel)
return ctrlChannel, errorChannel, &storage{root: root}
}
func start(ctrlChannel <-chan string, errorChannel chan<- error) {
close(errorChannel)
}
// Bucket Operaotions
func (storage *storage) ListBuckets(prefix string) ([]mstorage.BucketMetadata, error) {
return []mstorage.BucketMetadata{}, errors.New("Not Implemented")
}
func (storage *storage) StoreBucket(bucket string) error {
storage.writeLock.Lock()
defer storage.writeLock.Unlock()
// verify bucket path legal
if mstorage.IsValidBucket(bucket) == false {
return mstorage.BucketNameInvalid{Bucket: bucket}
}
// get bucket path
bucketDir := path.Join(storage.root, bucket)
// check if bucket exists
if _, err := os.Stat(bucketDir); err == nil {
return mstorage.BucketExists{
Bucket: bucket,
}
}
// make bucket
err := os.Mkdir(bucketDir, 0700)
if err != nil {
return mstorage.EmbedError(bucket, "", err)
}
return nil
}
// Object Operations
func (storage *storage) CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error) {
// validate bucket
if mstorage.IsValidBucket(bucket) == false {
return 0, mstorage.BucketNameInvalid{Bucket: bucket}
}
// validate object
if mstorage.IsValidObject(object) == false {
return 0, mstorage.ObjectNameInvalid{Bucket: bucket, Object: object}
}
objectPath := path.Join(storage.root, bucket, object)
file, err := os.Open(objectPath)
if err != nil {
return 0, mstorage.EmbedError(bucket, object, err)
}
count, err := io.Copy(w, file)
if err != nil {
return count, mstorage.EmbedError(bucket, object, err)
}
return count, nil
}
func (storage *storage) GetObjectMetadata(bucket string, object string) (mstorage.ObjectMetadata, error) {
if mstorage.IsValidBucket(bucket) == false {
return mstorage.ObjectMetadata{}, mstorage.BucketNameInvalid{Bucket: bucket}
}
if mstorage.IsValidObject(bucket) == false {
return mstorage.ObjectMetadata{}, mstorage.ObjectNameInvalid{Bucket: bucket, Object: bucket}
}
objectPath := path.Join(storage.root, bucket, object)
stat, err := os.Stat(objectPath)
if os.IsNotExist(err) {
return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: object}
}
metadata := mstorage.ObjectMetadata{
Bucket: bucket,
Key: object,
Created: stat.ModTime(),
Size: stat.Size(),
ETag: bucket + "#" + object,
}
return metadata, nil
}
func (storage *storage) ListObjects(bucket, prefix string, count int) ([]mstorage.ObjectMetadata, bool, error) {
if mstorage.IsValidBucket(bucket) == false {
return []mstorage.ObjectMetadata{}, false, mstorage.BucketNameInvalid{Bucket: bucket}
}
if mstorage.IsValidObject(prefix) == false {
return []mstorage.ObjectMetadata{}, false, mstorage.ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
rootPrefix := path.Join(storage.root, bucket)
files, err := ioutil.ReadDir(rootPrefix)
if err != nil {
return []mstorage.ObjectMetadata{}, false, mstorage.EmbedError("bucket", "", err)
}
var metadataList []mstorage.ObjectMetadata
for _, file := range files {
if len(metadataList) >= count {
return metadataList, true, nil
}
if strings.HasPrefix(file.Name(), prefix) {
metadata := mstorage.ObjectMetadata{
Bucket: bucket,
Key: file.Name(),
Created: file.ModTime(),
Size: file.Size(),
ETag: bucket + "#" + file.Name(),
}
metadataList = append(metadataList, metadata)
}
}
return metadataList, false, nil
}
func (storage *storage) StoreObject(bucket string, key string, data io.Reader) error {
// TODO Commits should stage then move instead of writing directly
storage.writeLock.Lock()
defer storage.writeLock.Unlock()
// check bucket name valid
if mstorage.IsValidBucket(bucket) == false {
return mstorage.BucketNameInvalid{Bucket: bucket}
}
// check bucket exists
if _, err := os.Stat(path.Join(storage.root, bucket)); os.IsNotExist(err) {
return mstorage.BucketNotFound{Bucket: bucket}
}
// verify object path legal
if mstorage.IsValidObject(key) == false {
return mstorage.ObjectNameInvalid{Bucket: bucket, Object: key}
}
// get object path
objectPath := path.Join(storage.root, bucket, key)
// check if object exists
if _, err := os.Stat(objectPath); !os.IsNotExist(err) {
return mstorage.ObjectExists{
Bucket: bucket,
Key: key,
}
}
// write object
file, err := os.OpenFile(objectPath, os.O_WRONLY|os.O_CREATE, 0600)
defer file.Close()
if err != nil {
return mstorage.EmbedError(bucket, key, err)
}
_, err = io.Copy(file, data)
if err != nil {
return mstorage.EmbedError(bucket, key, err)
}
return nil
}

@ -0,0 +1,42 @@
package fs
import (
"io/ioutil"
"log"
"os"
"testing"
mstorage "github.com/minio-io/minio/pkg/storage"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) { TestingT(t) }
type MySuite struct{}
var _ = Suite(&MySuite{})
func (s *MySuite) TestAPISuite(c *C) {
var storageList []string
create := func() mstorage.Storage {
path, err := ioutil.TempDir(os.TempDir(), "minio-fs-")
log.Println(path)
c.Check(err, IsNil)
storageList = append(storageList, path)
_, _, store := Start(path)
return store
}
log.Println("FOO")
mstorage.APITestSuite(c, create)
log.Println("BAR")
removeRoots(c, storageList)
}
func removeRoots(c *C, roots []string) {
log.Println("REMOVING ROOTS: ", roots)
for _, root := range roots {
err := os.RemoveAll(root)
c.Check(err, IsNil)
}
}

@ -11,10 +11,10 @@ import (
func APITestSuite(c *C, create func() Storage) {
testCreateBucket(c, create)
testMultipleObjectCreation(c, create)
//testPaging(c, create)
//testObjectOverwriteFails(c, create)
//testNonExistantBucketOperations(c, create)
//testBucketRecreateFails(c, create)
testPaging(c, create)
testObjectOverwriteFails(c, create)
testNonExistantBucketOperations(c, create)
testBucketRecreateFails(c, create)
}
func testCreateBucket(c *C, create func() Storage) {
@ -58,24 +58,24 @@ func testMultipleObjectCreation(c *C, create func() Storage) {
func testPaging(c *C, create func() Storage) {
storage := create()
storage.StoreBucket("bucket")
storage.ListObjects("bucket", "", 1000)
objects, isTruncated, err := storage.ListObjects("bucket", "", 1000)
storage.ListObjects("bucket", "", 5)
objects, isTruncated, err := storage.ListObjects("bucket", "", 5)
c.Check(len(objects), Equals, 0)
c.Check(isTruncated, Equals, false)
c.Check(err, IsNil)
for i := 1; i <= 1000; i++ {
for i := 1; i <= 5; i++ {
key := "obj" + strconv.Itoa(i)
storage.StoreObject("bucket", key, bytes.NewBufferString(key))
objects, isTruncated, err = storage.ListObjects("bucket", "", 1000)
objects, isTruncated, err = storage.ListObjects("bucket", "", 5)
c.Check(len(objects), Equals, i)
c.Check(isTruncated, Equals, false)
c.Check(err, IsNil)
}
for i := 1001; i <= 2000; i++ {
for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i)
storage.StoreObject("bucket", key, bytes.NewBufferString(key))
objects, isTruncated, err = storage.ListObjects("bucket", "", 1000)
c.Check(len(objects), Equals, 1000)
objects, isTruncated, err = storage.ListObjects("bucket", "", 5)
c.Check(len(objects), Equals, 5)
c.Check(isTruncated, Equals, true)
c.Check(err, IsNil)
}
@ -86,8 +86,13 @@ func testObjectOverwriteFails(c *C, create func() Storage) {
storage.StoreBucket("bucket")
err := storage.StoreObject("bucket", "object", bytes.NewBufferString("one"))
c.Check(err, IsNil)
err = storage.StoreObject("bucket", "object", bytes.NewBufferString("one"))
err = storage.StoreObject("bucket", "object", bytes.NewBufferString("three"))
c.Check(err, Not(IsNil))
var bytesBuffer bytes.Buffer
length, err := storage.CopyObjectToWriter(&bytesBuffer, "bucket", "object")
c.Check(length, Equals, int64(len("one")))
c.Check(err, IsNil)
c.Check(string(bytesBuffer.Bytes()), Equals, "one")
}
func testNonExistantBucketOperations(c *C, create func() Storage) {

Loading…
Cancel
Save