Add list() object support for erasure and append storage drivers

- Reply back objects with their protectionlevel and md5sum
  - // TODO hash value
  - Calculate md5sum after "storeBlocks()", to make sure data is committed
master
Harshavardhana 10 years ago
parent 267aa87ad7
commit 0a0e1111cd
  1. 22
      cmd/erasure-demo/erasure.go
  2. 2
      cmd/erasure-demo/fs.go
  3. 15
      pkgs/storage/appendstorage/append_storage.go
  4. 21
      pkgs/storage/encodedstorage/encoded_storage.go
  5. 28
      pkgs/storage/encodedstorage/encoded_storage_test.go
  6. 10
      pkgs/storage/fsstorage/fs_storage.go
  7. 53
      pkgs/storage/fsstorage/fs_storage_test.go
  8. 9
      pkgs/storage/storage.go

@ -1,7 +1,8 @@
package main package main
import ( import (
"errors" "bytes"
"encoding/json"
"io" "io"
"os" "os"
"path" "path"
@ -11,8 +12,23 @@ import (
) )
func erasureGetList(config inputConfig) (io.Reader, error) { func erasureGetList(config inputConfig) (io.Reader, error) {
// do nothing var objectStorage storage.ObjectStorage
return nil, errors.New("Not Implemented") rootDir := path.Join(config.rootDir, config.storageDriver)
objectStorage, err := es.NewStorage(rootDir, config.k, config.m, config.blockSize)
if err != nil {
return nil, err
}
objectDescList, err := objectStorage.List()
if err != nil {
return nil, err
}
var objectDescListBytes []byte
if objectDescListBytes, err = json.Marshal(objectDescList); err != nil {
return nil, err
}
objectDescListBuffer := bytes.NewBuffer(objectDescListBytes)
return objectDescListBuffer, nil
} }
func erasureGet(config inputConfig, objectPath string) (io.Reader, error) { func erasureGet(config inputConfig, objectPath string) (io.Reader, error) {

@ -15,7 +15,7 @@ func fsGetList(config inputConfig) (io.Reader, error) {
var objectStorage storage.ObjectStorage var objectStorage storage.ObjectStorage
rootDir := path.Join(config.rootDir, config.storageDriver) rootDir := path.Join(config.rootDir, config.storageDriver)
objectStorage, _ = fsstorage.NewStorage(rootDir) objectStorage, _ = fsstorage.NewStorage(rootDir)
objectList, err := objectStorage.List("/") objectList, err := objectStorage.List()
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -117,6 +117,17 @@ func (aStorage *appendStorage) Put(objectPath string, object io.Reader) error {
return nil return nil
} }
func (aStorage *appendStorage) List(listPath string) ([]storage.ObjectDescription, error) { func (aStorage *appendStorage) List() ([]storage.ObjectDescription, error) {
return nil, errors.New("Not Implemented") var objectDescList []storage.ObjectDescription
for objectName, _ := range aStorage.objects {
var objectDescription storage.ObjectDescription
objectDescription.Name = objectName
objectDescription.Md5sum = ""
objectDescription.Protectionlevel = ""
objectDescList = append(objectDescList, objectDescription)
}
if len(objectDescList) == 0 {
return nil, errors.New("No objects found")
}
return objectDescList, nil
} }

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
"encoding/gob" "encoding/gob"
"encoding/hex"
"errors" "errors"
"io" "io"
"io/ioutil" "io/ioutil"
@ -93,8 +94,20 @@ func (eStorage *encodedStorage) Get(objectPath string) (io.Reader, error) {
return reader, nil return reader, nil
} }
func (eStorage *encodedStorage) List(listPath string) ([]storage.ObjectDescription, error) { func (eStorage *encodedStorage) List() ([]storage.ObjectDescription, error) {
return nil, errors.New("Not Implemented") var objectDescList []storage.ObjectDescription
for objectName, objectEntry := range eStorage.objects {
var objectDescription storage.ObjectDescription
protectionLevel := strconv.Itoa(objectEntry.Encoderparams.K) + "," + strconv.Itoa(objectEntry.Encoderparams.M)
objectDescription.Name = objectName
objectDescription.Md5sum = hex.EncodeToString(objectEntry.Md5sum)
objectDescription.Protectionlevel = protectionLevel
objectDescList = append(objectDescList, objectDescription)
}
if len(objectDescList) == 0 {
return nil, errors.New("No objects found")
}
return objectDescList, nil
} }
func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error { func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error {
@ -124,8 +137,6 @@ func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error {
// encode // encode
for chunk := range chunks { for chunk := range chunks {
if chunk.Err == nil { if chunk.Err == nil {
// md5sum on chunk
hash.Write(chunk.Data)
// encode each // encode each
blocks, length := encoder.Encode(chunk.Data) blocks, length := encoder.Encode(chunk.Data)
// store each // store each
@ -135,6 +146,8 @@ func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error {
return err return err
} }
} }
// md5sum only after chunk is committed to disk
hash.Write(chunk.Data)
blockEntry := StorageBlockEntry{ blockEntry := StorageBlockEntry{
Index: i, Index: i,
Length: length, Length: length,

@ -38,9 +38,9 @@ func (s *EncodedStorageSuite) TestFileStoragePutAtRootPath(c *C) {
object1, _ := ioutil.ReadAll(objectResult1) object1, _ := ioutil.ReadAll(objectResult1)
c.Assert(string(object1), Equals, "object1") c.Assert(string(object1), Equals, "object1")
// objectList, err := objectStorage.List("/") objectList, err := objectStorage.List()
// c.Assert(err, IsNil) c.Assert(err, IsNil)
// c.Assert(objectList[0].Path, Equals, "path1") c.Assert(objectList[0].Name, Equals, "path1")
} }
func (s *EncodedStorageSuite) TestFileStoragePutDirPath(c *C) { func (s *EncodedStorageSuite) TestFileStoragePutDirPath(c *C) {
@ -70,28 +70,6 @@ func (s *EncodedStorageSuite) TestFileStoragePutDirPath(c *C) {
objectBuffer3 := bytes.NewBuffer([]byte("object3")) objectBuffer3 := bytes.NewBuffer([]byte("object3"))
err = objectStorage.Put("object3", objectBuffer3) err = objectStorage.Put("object3", objectBuffer3)
c.Assert(err, IsNil) c.Assert(err, IsNil)
// TODO support list
// objectList, err := objectStorage.List("/")
// c.Assert(err, IsNil)
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "object3", IsDir: false, Hash: ""})
// c.Assert(objectList[1], Equals, storage.ObjectDescription{Path: "path1", IsDir: true, Hash: ""})
// c.Assert(objectList[2], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
// c.Assert(len(objectList), Equals, 3)
//
// objectList, err = objectStorage.List("/path1")
// c.Assert(err, IsNil)
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
// c.Assert(len(objectList), Equals, 1)
//
// objectList, err = objectStorage.List("/path1/path2")
// c.Assert(err, IsNil)
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path3", IsDir: false, Hash: ""})
// c.Assert(len(objectList), Equals, 1)
//
// objectList, err = objectStorage.List("/path1/path2/path3")
// c.Assert(err, Not(IsNil))
// c.Assert(objectList, IsNil)
} }
func (s *EncodedStorageSuite) TestObjectWithChunking(c *C) { func (s *EncodedStorageSuite) TestObjectWithChunking(c *C) {

@ -21,8 +21,8 @@ func NewStorage(rootDir string) (storage.ObjectStorage, error) {
return &newStorage, nil return &newStorage, nil
} }
func (fsStorage *fileSystemStorage) List(listPath string) ([]storage.ObjectDescription, error) { func (fsStorage *fileSystemStorage) List() ([]storage.ObjectDescription, error) {
fileInfos, err := ioutil.ReadDir(path.Join(fsStorage.RootDir, listPath)) fileInfos, err := ioutil.ReadDir(fsStorage.RootDir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -31,9 +31,9 @@ func (fsStorage *fileSystemStorage) List(listPath string) ([]storage.ObjectDescr
for _, fi := range fileInfos { for _, fi := range fileInfos {
description := storage.ObjectDescription{ description := storage.ObjectDescription{
Path: fi.Name(), Name: fi.Name(),
IsDir: fi.IsDir(), Md5sum: "",
Hash: "", // TODO Protectionlevel: "",
} }
descriptions = append(descriptions, description) descriptions = append(descriptions, description)
} }

@ -37,56 +37,7 @@ func (s *fileSystemStorageSuite) TestfileStoragePutAtRootPath(c *C) {
object1, _ := ioutil.ReadAll(objectResult1) object1, _ := ioutil.ReadAll(objectResult1)
c.Assert(string(object1), Equals, "object1") c.Assert(string(object1), Equals, "object1")
objectList, err := objectStorage.List("/") objectList, err := objectStorage.List()
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(objectList[0].Path, Equals, "path1") c.Assert(objectList[0].Name, Equals, "path1")
}
func (s *fileSystemStorageSuite) TestfileStoragePutDirPath(c *C) {
rootDir, err := makeTempTestDir()
c.Assert(err, IsNil)
defer os.RemoveAll(rootDir)
var objectStorage storage.ObjectStorage
objectStorage, _ = NewStorage(rootDir)
objectBuffer1 := bytes.NewBuffer([]byte("object1"))
objectStorage.Put("path1/path2/path3", objectBuffer1)
// assert object1 was created in correct path
objectResult1, err := objectStorage.Get("path1/path2/path3")
c.Assert(err, IsNil)
object1, _ := ioutil.ReadAll(objectResult1)
c.Assert(string(object1), Equals, "object1")
// add second object
objectBuffer2 := bytes.NewBuffer([]byte("object2"))
err = objectStorage.Put("path2/path2/path2", objectBuffer2)
c.Assert(err, IsNil)
// add third object
objectBuffer3 := bytes.NewBuffer([]byte("object3"))
err = objectStorage.Put("object3", objectBuffer3)
c.Assert(err, IsNil)
objectList, err := objectStorage.List("/")
c.Assert(err, IsNil)
c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "object3", IsDir: false, Hash: ""})
c.Assert(objectList[1], Equals, storage.ObjectDescription{Path: "path1", IsDir: true, Hash: ""})
c.Assert(objectList[2], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
c.Assert(len(objectList), Equals, 3)
objectList, err = objectStorage.List("/path1")
c.Assert(err, IsNil)
c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
c.Assert(len(objectList), Equals, 1)
objectList, err = objectStorage.List("/path1/path2")
c.Assert(err, IsNil)
c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path3", IsDir: false, Hash: ""})
c.Assert(len(objectList), Equals, 1)
objectList, err = objectStorage.List("/path1/path2/path3")
c.Assert(err, Not(IsNil))
c.Assert(objectList, IsNil)
} }

@ -3,13 +3,14 @@ package storage
import "io" import "io"
type ObjectStorage interface { type ObjectStorage interface {
List(path string) ([]ObjectDescription, error) List() ([]ObjectDescription, error)
Get(path string) (io.Reader, error) Get(path string) (io.Reader, error)
Put(path string, object io.Reader) error Put(path string, object io.Reader) error
} }
type ObjectDescription struct { type ObjectDescription struct {
Path string Name string
IsDir bool Md5sum string
Hash string Protectionlevel string
// Hash string - TODO
} }

Loading…
Cancel
Save