Merge pull request #126 from fkautz/pr_out_removing_fsstorage
commit
16e92521bd
@ -1,53 +0,0 @@ |
|||||||
package main |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"encoding/json" |
|
||||||
"io" |
|
||||||
"os" |
|
||||||
"path" |
|
||||||
|
|
||||||
"github.com/minio-io/minio/pkgs/storage" |
|
||||||
"github.com/minio-io/minio/pkgs/storage/fsstorage" |
|
||||||
) |
|
||||||
|
|
||||||
func fsGetList(config inputConfig) (io.Reader, error) { |
|
||||||
var objectStorage storage.ObjectStorage |
|
||||||
rootDir := path.Join(config.rootDir, config.storageDriver) |
|
||||||
objectStorage, _ = fsstorage.NewStorage(rootDir, config.blockSize) |
|
||||||
objectList, err := objectStorage.List() |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
var objectListBytes []byte |
|
||||||
if objectListBytes, err = json.Marshal(objectList); err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
objectListBuffer := bytes.NewBuffer(objectListBytes) |
|
||||||
return objectListBuffer, nil |
|
||||||
} |
|
||||||
|
|
||||||
func fsGet(config inputConfig, objectPath string) (io.Reader, error) { |
|
||||||
var objectStorage storage.ObjectStorage |
|
||||||
rootDir := path.Join(config.rootDir, config.storageDriver) |
|
||||||
objectStorage, _ = fsstorage.NewStorage(rootDir, config.blockSize) |
|
||||||
object, err := objectStorage.Get(objectPath) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
return object, nil |
|
||||||
} |
|
||||||
|
|
||||||
func fsPut(config inputConfig, objectPath string, reader io.Reader) error { |
|
||||||
var err error |
|
||||||
rootDir := path.Join(config.rootDir, config.storageDriver) |
|
||||||
if err := os.MkdirAll(config.rootDir, 0700); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
var objectStorage storage.ObjectStorage |
|
||||||
objectStorage, _ = fsstorage.NewStorage(rootDir, config.blockSize) |
|
||||||
if err = objectStorage.Put(objectPath, reader); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
return nil |
|
||||||
} |
|
@ -1,11 +0,0 @@ |
|||||||
all: build test |
|
||||||
.PHONY: all |
|
||||||
|
|
||||||
build: |
|
||||||
@godep go build
|
|
||||||
|
|
||||||
test: build |
|
||||||
@godep go test -race -coverprofile=cover.out
|
|
||||||
|
|
||||||
clean: |
|
||||||
@rm -v cover.out
|
|
@ -1,177 +0,0 @@ |
|||||||
/* |
|
||||||
* Mini Object Storage, (C) 2014 Minio, Inc. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package fsstorage |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"crypto/md5" |
|
||||||
"encoding/gob" |
|
||||||
"encoding/hex" |
|
||||||
"errors" |
|
||||||
"io" |
|
||||||
"io/ioutil" |
|
||||||
"os" |
|
||||||
"path" |
|
||||||
"strconv" |
|
||||||
|
|
||||||
"github.com/minio-io/minio/pkgs/split" |
|
||||||
"github.com/minio-io/minio/pkgs/storage" |
|
||||||
"github.com/minio-io/minio/pkgs/storage/appendstorage" |
|
||||||
"github.com/spaolacci/murmur3" |
|
||||||
) |
|
||||||
|
|
||||||
type fileSystemStorage struct { |
|
||||||
RootDir string |
|
||||||
BlockSize uint64 |
|
||||||
diskStorage []storage.ObjectStorage |
|
||||||
objects map[string]StorageEntry |
|
||||||
} |
|
||||||
|
|
||||||
type StorageEntry struct { |
|
||||||
Path string |
|
||||||
Md5sum []byte |
|
||||||
Murmurhash uint64 |
|
||||||
ChunkLength int |
|
||||||
} |
|
||||||
|
|
||||||
func NewStorage(rootDir string, blockSize uint64) (storage.ObjectStorage, error) { |
|
||||||
var storageNodes []storage.ObjectStorage |
|
||||||
storageNode, err := appendstorage.NewStorage(rootDir, 0) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
storageNodes = append(storageNodes, storageNode) |
|
||||||
objects := make(map[string]StorageEntry) |
|
||||||
indexPath := path.Join(rootDir, "index") |
|
||||||
if _, err := os.Stat(indexPath); err == nil { |
|
||||||
indexFile, err := os.Open(indexPath) |
|
||||||
defer indexFile.Close() |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
encoder := gob.NewDecoder(indexFile) |
|
||||||
err = encoder.Decode(&objects) |
|
||||||
if err != nil && err != io.EOF { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
} |
|
||||||
newStorage := fileSystemStorage{ |
|
||||||
RootDir: rootDir, |
|
||||||
diskStorage: storageNodes, |
|
||||||
BlockSize: blockSize, |
|
||||||
objects: objects, |
|
||||||
} |
|
||||||
return &newStorage, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (fsStorage *fileSystemStorage) List() ([]storage.ObjectDescription, error) { |
|
||||||
var objectDescList []storage.ObjectDescription |
|
||||||
for objectName, objectEntry := range fsStorage.objects { |
|
||||||
var objectDescription storage.ObjectDescription |
|
||||||
objectDescription.Name = objectName |
|
||||||
objectDescription.Md5sum = hex.EncodeToString(objectEntry.Md5sum) |
|
||||||
objectDescription.Murmur3 = strconv.FormatUint(objectEntry.Murmurhash, 16) |
|
||||||
objectDescList = append(objectDescList, objectDescription) |
|
||||||
} |
|
||||||
if len(objectDescList) == 0 { |
|
||||||
return nil, errors.New("No objects found") |
|
||||||
} |
|
||||||
return objectDescList, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (fsStorage *fileSystemStorage) Get(objectPath string) (io.Reader, error) { |
|
||||||
entry, ok := fsStorage.objects[objectPath] |
|
||||||
if ok == false { |
|
||||||
return nil, nil |
|
||||||
} |
|
||||||
reader, writer := io.Pipe() |
|
||||||
go fsStorage.readObject(objectPath, entry, writer) |
|
||||||
return reader, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (fsStorage *fileSystemStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { |
|
||||||
appendStorage := fsStorage.diskStorage[0] |
|
||||||
for i := 0; i < entry.ChunkLength; i++ { |
|
||||||
chunkObjectPath := objectPath + "$" + strconv.Itoa(i) |
|
||||||
chunkObject, err := appendStorage.Get(chunkObjectPath) |
|
||||||
|
|
||||||
if err != nil { |
|
||||||
writer.CloseWithError(err) |
|
||||||
} |
|
||||||
data, readErr := ioutil.ReadAll(chunkObject) |
|
||||||
|
|
||||||
if readErr != nil { |
|
||||||
writer.CloseWithError(readErr) |
|
||||||
} |
|
||||||
bytesWritten := 0 |
|
||||||
for bytesWritten != len(data) { |
|
||||||
written, err := writer.Write(data[bytesWritten:len(data)]) |
|
||||||
if err != nil { |
|
||||||
writer.CloseWithError(err) |
|
||||||
} |
|
||||||
bytesWritten += written |
|
||||||
} |
|
||||||
} |
|
||||||
writer.Close() |
|
||||||
} |
|
||||||
|
|
||||||
func (fsStorage *fileSystemStorage) Put(objectPath string, object io.Reader) error { |
|
||||||
// split
|
|
||||||
chunks := make(chan split.SplitMessage) |
|
||||||
go split.SplitStream(object, fsStorage.BlockSize, chunks) |
|
||||||
|
|
||||||
entry := StorageEntry{ |
|
||||||
Path: objectPath, |
|
||||||
Md5sum: nil, |
|
||||||
Murmurhash: 0, |
|
||||||
ChunkLength: 0, |
|
||||||
} |
|
||||||
|
|
||||||
murmur := murmur3.Sum64([]byte(objectPath)) |
|
||||||
hash := md5.New() |
|
||||||
i := 0 |
|
||||||
for chunk := range chunks { |
|
||||||
if chunk.Err != nil { |
|
||||||
return chunk.Err |
|
||||||
} |
|
||||||
err := fsStorage.storeBlocks(objectPath, i, chunk.Data) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
// md5sum only after chunk is committed to disk
|
|
||||||
hash.Write(chunk.Data) |
|
||||||
i++ |
|
||||||
} |
|
||||||
entry.Md5sum = hash.Sum(nil) |
|
||||||
entry.ChunkLength = i |
|
||||||
entry.Murmurhash = murmur |
|
||||||
fsStorage.objects[objectPath] = entry |
|
||||||
var gobBuffer bytes.Buffer |
|
||||||
gobEncoder := gob.NewEncoder(&gobBuffer) |
|
||||||
gobEncoder.Encode(fsStorage.objects) |
|
||||||
ioutil.WriteFile(path.Join(fsStorage.RootDir, "index"), gobBuffer.Bytes(), 0600) |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
func (fsStorage *fileSystemStorage) storeBlocks(objectPath string, index int, chunk []byte) error { |
|
||||||
appendStorage := fsStorage.diskStorage[0] |
|
||||||
path := objectPath + "$" + strconv.Itoa(index) |
|
||||||
if err := appendStorage.Put(path, bytes.NewBuffer(chunk)); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
return nil |
|
||||||
} |
|
@ -1,44 +0,0 @@ |
|||||||
package fsstorage |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"io/ioutil" |
|
||||||
"os" |
|
||||||
"testing" |
|
||||||
|
|
||||||
"github.com/minio-io/minio/pkgs/storage" |
|
||||||
"github.com/minio-io/minio/pkgs/utils" |
|
||||||
. "gopkg.in/check.v1" |
|
||||||
) |
|
||||||
|
|
||||||
type fileSystemStorageSuite struct{} |
|
||||||
|
|
||||||
var _ = Suite(&fileSystemStorageSuite{}) |
|
||||||
|
|
||||||
func Test(t *testing.T) { TestingT(t) } |
|
||||||
|
|
||||||
func (s *fileSystemStorageSuite) TestfileStoragePutAtRootPath(c *C) { |
|
||||||
rootDir, err := utils.MakeTempTestDir() |
|
||||||
c.Assert(err, IsNil) |
|
||||||
defer os.RemoveAll(rootDir) |
|
||||||
|
|
||||||
var objectStorage storage.ObjectStorage |
|
||||||
objectStorage, _ = NewStorage(rootDir, 1024) |
|
||||||
|
|
||||||
objectBuffer := bytes.NewBuffer([]byte("object1")) |
|
||||||
err = objectStorage.Put("path1", objectBuffer) |
|
||||||
c.Assert(err, IsNil) |
|
||||||
|
|
||||||
// assert object1 was created in correct path
|
|
||||||
objectResult1, err := objectStorage.Get("path1") |
|
||||||
c.Assert(err, IsNil) |
|
||||||
|
|
||||||
object1, err := ioutil.ReadAll(objectResult1) |
|
||||||
c.Assert(err, IsNil) |
|
||||||
|
|
||||||
c.Assert(string(object1), Equals, "object1") |
|
||||||
|
|
||||||
objectList, err := objectStorage.List() |
|
||||||
c.Assert(err, IsNil) |
|
||||||
c.Assert(objectList[0].Name, Equals, "path1") |
|
||||||
} |
|
Loading…
Reference in new issue