|
|
|
@ -23,7 +23,6 @@ import ( |
|
|
|
|
"io" |
|
|
|
|
"path/filepath" |
|
|
|
|
"sort" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
@ -44,7 +43,6 @@ type bucket struct { |
|
|
|
|
time time.Time |
|
|
|
|
donutName string |
|
|
|
|
nodes map[string]node |
|
|
|
|
objects map[string]object |
|
|
|
|
lock *sync.RWMutex |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -66,13 +64,15 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu |
|
|
|
|
b.time = t |
|
|
|
|
b.donutName = donutName |
|
|
|
|
b.nodes = nodes |
|
|
|
|
b.objects = make(map[string]object) |
|
|
|
|
b.lock = new(sync.RWMutex) |
|
|
|
|
|
|
|
|
|
metadata := BucketMetadata{} |
|
|
|
|
metadata.Version = bucketMetadataVersion |
|
|
|
|
metadata.Name = bucketName |
|
|
|
|
metadata.ACL = aclType |
|
|
|
|
metadata.Created = t |
|
|
|
|
metadata.Metadata = make(map[string]string) |
|
|
|
|
metadata.BucketObjectsMetadata = make(map[string]map[string]string) |
|
|
|
|
|
|
|
|
|
return b, metadata, nil |
|
|
|
|
} |
|
|
|
@ -81,64 +81,85 @@ func (b bucket) getBucketName() string { |
|
|
|
|
return b.name |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) getObjectName(fileName, diskPath, bucketPath string) (string, error) { |
|
|
|
|
newObject, err := newObject(fileName, filepath.Join(diskPath, bucketPath)) |
|
|
|
|
func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) { |
|
|
|
|
b.lock.RLock() |
|
|
|
|
defer b.lock.RUnlock() |
|
|
|
|
metadataReaders, err := b.getDiskReaders(normalizeObjectName(objectName), objectMetadataConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
return ObjectMetadata{}, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for _, metadataReader := range metadataReaders { |
|
|
|
|
defer metadataReader.Close() |
|
|
|
|
} |
|
|
|
|
objMetadata := ObjectMetadata{} |
|
|
|
|
for _, metadataReader := range metadataReaders { |
|
|
|
|
jdec := json.NewDecoder(metadataReader) |
|
|
|
|
if err := jdec.Decode(&objMetadata); err != nil { |
|
|
|
|
return ObjectMetadata{}, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
newObjectMetadata, err := newObject.GetObjectMetadata() |
|
|
|
|
return objMetadata, nil |
|
|
|
|
} |
|
|
|
|
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) getBucketMetadataReaders() ([]io.ReadCloser, error) { |
|
|
|
|
var readers []io.ReadCloser |
|
|
|
|
for _, node := range b.nodes { |
|
|
|
|
disks, err := node.ListDisks() |
|
|
|
|
if err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
readers = make([]io.ReadCloser, len(disks)) |
|
|
|
|
for order, disk := range disks { |
|
|
|
|
bucketMetaDataReader, err := disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
if newObjectMetadata.Object == "" { |
|
|
|
|
return "", iodine.New(ObjectCorrupted{Object: newObject.name}, nil) |
|
|
|
|
readers[order] = bucketMetaDataReader |
|
|
|
|
} |
|
|
|
|
b.objects[newObjectMetadata.Object] = newObject |
|
|
|
|
return newObjectMetadata.Object, nil |
|
|
|
|
} |
|
|
|
|
return readers, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) { |
|
|
|
|
return b.objects[objectName].GetObjectMetadata() |
|
|
|
|
func (b bucket) getBucketMetadata() (*AllBuckets, error) { |
|
|
|
|
metadata := new(AllBuckets) |
|
|
|
|
readers, err := b.getBucketMetadataReaders() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for _, reader := range readers { |
|
|
|
|
defer reader.Close() |
|
|
|
|
} |
|
|
|
|
for _, reader := range readers { |
|
|
|
|
jenc := json.NewDecoder(reader) |
|
|
|
|
if err := jenc.Decode(metadata); err != nil { |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
return metadata, nil |
|
|
|
|
} |
|
|
|
|
return nil, iodine.New(InvalidArgument{}, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListObjects - list all objects
|
|
|
|
|
func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) ([]string, []string, bool, error) { |
|
|
|
|
b.lock.RLock() |
|
|
|
|
defer b.lock.RUnlock() |
|
|
|
|
|
|
|
|
|
if maxkeys <= 0 { |
|
|
|
|
maxkeys = 1000 |
|
|
|
|
} |
|
|
|
|
var isTruncated bool |
|
|
|
|
nodeSlice := 0 |
|
|
|
|
var objects []string |
|
|
|
|
for _, node := range b.nodes { |
|
|
|
|
disks, err := node.ListDisks() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, nil, false, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for order, disk := range disks { |
|
|
|
|
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order) |
|
|
|
|
bucketPath := filepath.Join(b.donutName, bucketSlice) |
|
|
|
|
files, err := disk.ListDir(bucketPath) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, nil, false, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for _, file := range files { |
|
|
|
|
objectName, err := b.getObjectName(file.Name(), disk.GetPath(), bucketPath) |
|
|
|
|
bucketMetadata, err := b.getBucketMetadata() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, nil, false, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjectsMetadata { |
|
|
|
|
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) { |
|
|
|
|
if objectName > marker { |
|
|
|
|
objects = appendUniq(objects, objectName) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
nodeSlice = nodeSlice + 1 |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
if strings.TrimSpace(prefix) != "" { |
|
|
|
|
objects = removePrefix(objects, prefix) |
|
|
|
|
} |
|
|
|
@ -170,7 +191,6 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) ([]st |
|
|
|
|
sort.Strings(commonPrefixes) |
|
|
|
|
return results, commonPrefixes, isTruncated, nil |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ReadObject - open an object to read
|
|
|
|
|
func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, err error) { |
|
|
|
@ -178,58 +198,58 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, |
|
|
|
|
defer b.lock.RUnlock() |
|
|
|
|
reader, writer := io.Pipe() |
|
|
|
|
// get list of objects
|
|
|
|
|
_, _, _, err = b.ListObjects(objectName, "", "", 1) |
|
|
|
|
bucketMetadata, err := b.getBucketMetadata() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
// check if object exists
|
|
|
|
|
object, ok := b.objects[objectName] |
|
|
|
|
if !ok { |
|
|
|
|
if _, ok := bucketMetadata.Buckets[b.getBucketName()].BucketObjectsMetadata[objectName]; !ok { |
|
|
|
|
return nil, 0, iodine.New(ObjectNotFound{Object: objectName}, nil) |
|
|
|
|
} |
|
|
|
|
// verify if sysObjectMetadata is readable, before we server the request
|
|
|
|
|
sysObjMetadata, err := object.GetSystemObjectMetadata() |
|
|
|
|
objMetadata := ObjectMetadata{} |
|
|
|
|
metadataReaders, err := b.getDiskReaders(normalizeObjectName(objectName), objectMetadataConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for _, metadataReader := range metadataReaders { |
|
|
|
|
defer metadataReader.Close() |
|
|
|
|
} |
|
|
|
|
for _, metadataReader := range metadataReaders { |
|
|
|
|
jdec := json.NewDecoder(metadataReader) |
|
|
|
|
if err := jdec.Decode(&objMetadata); err != nil { |
|
|
|
|
return nil, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
// read and reply back to GetObject() request in a go-routine
|
|
|
|
|
go b.readEncodedData(b.normalizeObjectName(objectName), writer, sysObjMetadata) |
|
|
|
|
return reader, sysObjMetadata.Size, nil |
|
|
|
|
go b.readEncodedData(normalizeObjectName(objectName), writer, objMetadata) |
|
|
|
|
return reader, objMetadata.Size, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// WriteObject - write a new object into bucket
|
|
|
|
|
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) (string, error) { |
|
|
|
|
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string) (string, error) { |
|
|
|
|
b.lock.Lock() |
|
|
|
|
defer b.lock.Unlock() |
|
|
|
|
if objectName == "" || objectData == nil { |
|
|
|
|
return "", iodine.New(InvalidArgument{}, nil) |
|
|
|
|
} |
|
|
|
|
writers, err := b.getDiskWriters(b.normalizeObjectName(objectName), "data") |
|
|
|
|
writers, err := b.getDiskWriters(normalizeObjectName(objectName), "data") |
|
|
|
|
if err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
sumMD5 := md5.New() |
|
|
|
|
sum512 := sha512.New() |
|
|
|
|
|
|
|
|
|
objMetadata := new(ObjectMetadata) |
|
|
|
|
sysObjMetadata := new(SystemObjectMetadata) |
|
|
|
|
objMetadata.Version = objectMetadataVersion |
|
|
|
|
sysObjMetadata.Version = systemObjectMetadataVersion |
|
|
|
|
size := metadata["contentLength"] |
|
|
|
|
sizeInt, err := strconv.ParseInt(size, 10, 64) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
objMetadata.Created = time.Now().UTC() |
|
|
|
|
// if total writers are only '1' do not compute erasure
|
|
|
|
|
switch len(writers) == 1 { |
|
|
|
|
case true: |
|
|
|
|
mw := io.MultiWriter(writers[0], sumMD5, sum512) |
|
|
|
|
totalLength, err := io.CopyN(mw, objectData, sizeInt) |
|
|
|
|
totalLength, err := io.Copy(mw, objectData) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
sysObjMetadata.Size = totalLength |
|
|
|
|
objMetadata.Size = totalLength |
|
|
|
|
case false: |
|
|
|
|
// calculate data and parity dictated by total number of writers
|
|
|
|
@ -243,30 +263,20 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
/// donutMetadata section
|
|
|
|
|
sysObjMetadata.BlockSize = 10 * 1024 * 1024 |
|
|
|
|
sysObjMetadata.ChunkCount = chunkCount |
|
|
|
|
sysObjMetadata.DataDisks = k |
|
|
|
|
sysObjMetadata.ParityDisks = m |
|
|
|
|
sysObjMetadata.ErasureTechnique = "Cauchy" |
|
|
|
|
sysObjMetadata.Size = int64(totalLength) |
|
|
|
|
// keep size inside ObjectMetadata as well for Object API requests
|
|
|
|
|
objMetadata.BlockSize = 10 * 1024 * 1024 |
|
|
|
|
objMetadata.ChunkCount = chunkCount |
|
|
|
|
objMetadata.DataDisks = k |
|
|
|
|
objMetadata.ParityDisks = m |
|
|
|
|
objMetadata.ErasureTechnique = "Cauchy" |
|
|
|
|
objMetadata.Size = int64(totalLength) |
|
|
|
|
} |
|
|
|
|
objMetadata.Bucket = b.getBucketName() |
|
|
|
|
objMetadata.Object = objectName |
|
|
|
|
objMetadata.Metadata = metadata |
|
|
|
|
dataMD5sum := sumMD5.Sum(nil) |
|
|
|
|
dataSHA512sum := sum512.Sum(nil) |
|
|
|
|
objMetadata.Created = time.Now().UTC() |
|
|
|
|
|
|
|
|
|
// keeping md5sum for the object in two different places
|
|
|
|
|
// one for object storage and another is for internal use
|
|
|
|
|
hexMD5Sum := hex.EncodeToString(dataMD5sum) |
|
|
|
|
hex512Sum := hex.EncodeToString(dataSHA512sum) |
|
|
|
|
objMetadata.MD5Sum = hexMD5Sum |
|
|
|
|
objMetadata.SHA512Sum = hex512Sum |
|
|
|
|
sysObjMetadata.MD5Sum = hexMD5Sum |
|
|
|
|
sysObjMetadata.SHA512Sum = hex512Sum |
|
|
|
|
objMetadata.MD5Sum = hex.EncodeToString(dataMD5sum) |
|
|
|
|
objMetadata.SHA512Sum = hex.EncodeToString(dataSHA512sum) |
|
|
|
|
|
|
|
|
|
// Verify if the written object is equal to what is expected, only if it is requested as such
|
|
|
|
|
if strings.TrimSpace(expectedMD5Sum) != "" { |
|
|
|
@ -274,12 +284,8 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// write donut specific metadata
|
|
|
|
|
if err := b.writeSystemObjectMetadata(b.normalizeObjectName(objectName), sysObjMetadata); err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
// write object specific metadata
|
|
|
|
|
if err := b.writeObjectMetadata(b.normalizeObjectName(objectName), objMetadata); err != nil { |
|
|
|
|
if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
// close all writers, when control flow reaches here
|
|
|
|
@ -329,27 +335,6 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata *ObjectMetada |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// writeSystemObjectMetadata - write donut related object metadata
|
|
|
|
|
func (b bucket) writeSystemObjectMetadata(objectName string, sysObjMetadata *SystemObjectMetadata) error { |
|
|
|
|
if sysObjMetadata == nil { |
|
|
|
|
return iodine.New(InvalidArgument{}, nil) |
|
|
|
|
} |
|
|
|
|
sysObjMetadataWriters, err := b.getDiskWriters(objectName, sysObjectMetadataConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
return iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for _, sysObjMetadataWriter := range sysObjMetadataWriters { |
|
|
|
|
defer sysObjMetadataWriter.Close() |
|
|
|
|
} |
|
|
|
|
for _, sysObjMetadataWriter := range sysObjMetadataWriters { |
|
|
|
|
jenc := json.NewEncoder(sysObjMetadataWriter) |
|
|
|
|
if err := jenc.Encode(sysObjMetadata); err != nil { |
|
|
|
|
return iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO - This a temporary normalization of objectNames, need to find a better way
|
|
|
|
|
//
|
|
|
|
|
// normalizedObjectName - all objectNames with "/" get normalized to a simple objectName
|
|
|
|
@ -358,7 +343,7 @@ func (b bucket) writeSystemObjectMetadata(objectName string, sysObjMetadata *Sys |
|
|
|
|
// user provided value - "this/is/my/deep/directory/structure"
|
|
|
|
|
// donut normalized value - "this-is-my-deep-directory-structure"
|
|
|
|
|
//
|
|
|
|
|
func (b bucket) normalizeObjectName(objectName string) string { |
|
|
|
|
func normalizeObjectName(objectName string) string { |
|
|
|
|
// replace every '/' with '-'
|
|
|
|
|
return strings.Replace(objectName, "/", "-", -1) |
|
|
|
|
} |
|
|
|
@ -407,12 +392,7 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// readEncodedData -
|
|
|
|
|
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, sysObjMetadata SystemObjectMetadata) { |
|
|
|
|
expectedMd5sum, err := hex.DecodeString(sysObjMetadata.MD5Sum) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) { |
|
|
|
|
readers, err := b.getDiskReaders(objectName, "data") |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
@ -421,22 +401,27 @@ func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, sysObj |
|
|
|
|
for _, reader := range readers { |
|
|
|
|
defer reader.Close() |
|
|
|
|
} |
|
|
|
|
expectedMd5sum, err := hex.DecodeString(objMetadata.MD5Sum) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
hasher := md5.New() |
|
|
|
|
mwriter := io.MultiWriter(writer, hasher) |
|
|
|
|
switch len(readers) == 1 { |
|
|
|
|
case false: |
|
|
|
|
if sysObjMetadata.ErasureTechnique == "" { |
|
|
|
|
if objMetadata.ErasureTechnique == "" { |
|
|
|
|
writer.CloseWithError(iodine.New(MissingErasureTechnique{}, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
encoder, err := newEncoder(sysObjMetadata.DataDisks, sysObjMetadata.ParityDisks, sysObjMetadata.ErasureTechnique) |
|
|
|
|
encoder, err := newEncoder(objMetadata.DataDisks, objMetadata.ParityDisks, objMetadata.ErasureTechnique) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
totalLeft := sysObjMetadata.Size |
|
|
|
|
for i := 0; i < sysObjMetadata.ChunkCount; i++ { |
|
|
|
|
decodedData, err := b.decodeEncodedData(totalLeft, int64(sysObjMetadata.BlockSize), readers, encoder, writer) |
|
|
|
|
totalLeft := objMetadata.Size |
|
|
|
|
for i := 0; i < objMetadata.ChunkCount; i++ { |
|
|
|
|
decodedData, err := b.decodeEncodedData(totalLeft, int64(objMetadata.BlockSize), readers, encoder, writer) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
@ -446,7 +431,7 @@ func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, sysObj |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
totalLeft = totalLeft - int64(sysObjMetadata.BlockSize) |
|
|
|
|
totalLeft = totalLeft - int64(objMetadata.BlockSize) |
|
|
|
|
} |
|
|
|
|
case true: |
|
|
|
|
_, err := io.Copy(writer, readers[0]) |
|
|
|
|