- also takes in 'blockSize' cli option. - robustness fixesmaster
parent
b570b486de
commit
fc50291cac
@ -1,57 +1,156 @@ |
|||||||
package fsstorage |
package fsstorage |
||||||
|
|
||||||
import ( |
import ( |
||||||
|
"bytes" |
||||||
|
"crypto/md5" |
||||||
|
"encoding/gob" |
||||||
|
"encoding/hex" |
||||||
|
"errors" |
||||||
"io" |
"io" |
||||||
"io/ioutil" |
"io/ioutil" |
||||||
"os" |
"os" |
||||||
"path" |
"path" |
||||||
"path/filepath" |
"strconv" |
||||||
|
|
||||||
|
"github.com/minio-io/minio/pkgs/split" |
||||||
"github.com/minio-io/minio/pkgs/storage" |
"github.com/minio-io/minio/pkgs/storage" |
||||||
|
"github.com/minio-io/minio/pkgs/storage/appendstorage" |
||||||
) |
) |
||||||
|
|
||||||
type fileSystemStorage struct { |
type fileSystemStorage struct { |
||||||
RootDir string |
RootDir string |
||||||
|
BlockSize uint64 |
||||||
|
diskStorage []storage.ObjectStorage |
||||||
|
objects map[string]StorageEntry |
||||||
} |
} |
||||||
|
|
||||||
func NewStorage(rootDir string) (storage.ObjectStorage, error) { |
type StorageEntry struct { |
||||||
|
Path string |
||||||
|
Md5sum []byte |
||||||
|
ChunkLength int |
||||||
|
} |
||||||
|
|
||||||
|
func NewStorage(rootDir string, blockSize uint64) (storage.ObjectStorage, error) { |
||||||
|
var storageNodes []storage.ObjectStorage |
||||||
|
storageNode, err := appendstorage.NewStorage(rootDir, 0) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
storageNodes = append(storageNodes, storageNode) |
||||||
|
objects := make(map[string]StorageEntry) |
||||||
|
indexPath := path.Join(rootDir, "index") |
||||||
|
if _, err := os.Stat(indexPath); err == nil { |
||||||
|
indexFile, err := os.Open(indexPath) |
||||||
|
defer indexFile.Close() |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
encoder := gob.NewDecoder(indexFile) |
||||||
|
err = encoder.Decode(&objects) |
||||||
|
if err != nil && err != io.EOF { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
} |
||||||
newStorage := fileSystemStorage{ |
newStorage := fileSystemStorage{ |
||||||
RootDir: rootDir, |
RootDir: rootDir, |
||||||
|
diskStorage: storageNodes, |
||||||
|
BlockSize: blockSize, |
||||||
|
objects: objects, |
||||||
} |
} |
||||||
return &newStorage, nil |
return &newStorage, nil |
||||||
} |
} |
||||||
|
|
||||||
func (fsStorage *fileSystemStorage) List() ([]storage.ObjectDescription, error) { |
func (fsStorage *fileSystemStorage) List() ([]storage.ObjectDescription, error) { |
||||||
fileInfos, err := ioutil.ReadDir(fsStorage.RootDir) |
var objectDescList []storage.ObjectDescription |
||||||
if err != nil { |
for objectName, objectEntry := range fsStorage.objects { |
||||||
return nil, err |
var objectDescription storage.ObjectDescription |
||||||
|
objectDescription.Name = objectName |
||||||
|
objectDescription.Md5sum = hex.EncodeToString(objectEntry.Md5sum) |
||||||
|
objectDescription.Protectionlevel = "" |
||||||
|
objectDescList = append(objectDescList, objectDescription) |
||||||
|
} |
||||||
|
if len(objectDescList) == 0 { |
||||||
|
return nil, errors.New("No objects found") |
||||||
|
} |
||||||
|
return objectDescList, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (fsStorage *fileSystemStorage) Get(objectPath string) (io.Reader, error) { |
||||||
|
entry, ok := fsStorage.objects[objectPath] |
||||||
|
if ok == false { |
||||||
|
return nil, nil |
||||||
} |
} |
||||||
|
reader, writer := io.Pipe() |
||||||
|
go fsStorage.readObject(objectPath, entry, writer) |
||||||
|
return reader, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (fsStorage *fileSystemStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { |
||||||
|
appendStorage := fsStorage.diskStorage[0] |
||||||
|
for i := 0; i < entry.ChunkLength; i++ { |
||||||
|
chunkObjectPath := objectPath + "$" + strconv.Itoa(i) |
||||||
|
chunkObject, err := appendStorage.Get(chunkObjectPath) |
||||||
|
|
||||||
var descriptions []storage.ObjectDescription |
if err != nil { |
||||||
|
writer.CloseWithError(err) |
||||||
|
} |
||||||
|
data, readErr := ioutil.ReadAll(chunkObject) |
||||||
|
|
||||||
for _, fi := range fileInfos { |
if readErr != nil { |
||||||
description := storage.ObjectDescription{ |
writer.CloseWithError(readErr) |
||||||
Name: fi.Name(), |
} |
||||||
Md5sum: "", |
bytesWritten := 0 |
||||||
Protectionlevel: "", |
for bytesWritten != len(data) { |
||||||
|
written, err := writer.Write(data[bytesWritten:len(data)]) |
||||||
|
if err != nil { |
||||||
|
writer.CloseWithError(err) |
||||||
|
} |
||||||
|
bytesWritten += written |
||||||
} |
} |
||||||
descriptions = append(descriptions, description) |
|
||||||
} |
} |
||||||
return descriptions, nil |
writer.Close() |
||||||
} |
} |
||||||
|
|
||||||
func (storage *fileSystemStorage) Get(objectPath string) (io.Reader, error) { |
func (fsStorage *fileSystemStorage) Put(objectPath string, object io.Reader) error { |
||||||
return os.Open(path.Join(storage.RootDir, objectPath)) |
// split
|
||||||
} |
chunks := make(chan split.SplitMessage) |
||||||
|
go split.SplitStream(object, fsStorage.BlockSize, chunks) |
||||||
|
|
||||||
func (storage *fileSystemStorage) Put(objectPath string, object io.Reader) error { |
entry := StorageEntry{ |
||||||
err := os.MkdirAll(filepath.Dir(path.Join(storage.RootDir, objectPath)), 0700) |
Path: objectPath, |
||||||
if err != nil { |
Md5sum: nil, |
||||||
return err |
ChunkLength: 0, |
||||||
} |
} |
||||||
objectBytes, err := ioutil.ReadAll(object) |
|
||||||
if err != nil { |
hash := md5.New() |
||||||
|
i := 0 |
||||||
|
for chunk := range chunks { |
||||||
|
if chunk.Err != nil { |
||||||
|
return chunk.Err |
||||||
|
} |
||||||
|
err := fsStorage.storeBlocks(objectPath, i, chunk.Data) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
// md5sum only after chunk is committed to disk
|
||||||
|
hash.Write(chunk.Data) |
||||||
|
i++ |
||||||
|
} |
||||||
|
entry.Md5sum = hash.Sum(nil) |
||||||
|
entry.ChunkLength = i |
||||||
|
fsStorage.objects[objectPath] = entry |
||||||
|
var gobBuffer bytes.Buffer |
||||||
|
gobEncoder := gob.NewEncoder(&gobBuffer) |
||||||
|
gobEncoder.Encode(fsStorage.objects) |
||||||
|
ioutil.WriteFile(path.Join(fsStorage.RootDir, "index"), gobBuffer.Bytes(), 0600) |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func (fsStorage *fileSystemStorage) storeBlocks(objectPath string, index int, chunk []byte) error { |
||||||
|
appendStorage := fsStorage.diskStorage[0] |
||||||
|
path := objectPath + "$" + strconv.Itoa(index) |
||||||
|
if err := appendStorage.Put(path, bytes.NewBuffer(chunk)); err != nil { |
||||||
return err |
return err |
||||||
} |
} |
||||||
return ioutil.WriteFile(path.Join(storage.RootDir, objectPath), objectBytes, 0600) |
return nil |
||||||
} |
} |
||||||
|
Loading…
Reference in new issue