|
|
|
@ -19,75 +19,242 @@ package donutstorage |
|
|
|
|
import ( |
|
|
|
|
"errors" |
|
|
|
|
"github.com/minio-io/minio/pkg/storage" |
|
|
|
|
"github.com/minio-io/minio/pkg/storage/donut" |
|
|
|
|
"io" |
|
|
|
|
"sort" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"time" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// DonutDriver creates a new single disk storage driver using donut without encoding.
|
|
|
|
|
type DonutDriver struct{} |
|
|
|
|
// Storage creates a new single disk storage driver using donut without encoding.
|
|
|
|
|
type Storage struct { |
|
|
|
|
donut donut.Donut |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
blockSize = 10 * 1024 * 1024 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Start a single disk subsystem
|
|
|
|
|
func Start() (chan<- string, <-chan error, storage.Storage) { |
|
|
|
|
func Start(path string) (chan<- string, <-chan error, storage.Storage) { |
|
|
|
|
|
|
|
|
|
ctrlChannel := make(chan string) |
|
|
|
|
errorChannel := make(chan error) |
|
|
|
|
s := new(DonutDriver) |
|
|
|
|
s := new(Storage) |
|
|
|
|
|
|
|
|
|
// TODO donut driver should be passed in as Start param and driven by config
|
|
|
|
|
s.donut = donut.NewDonutDriver(path) |
|
|
|
|
|
|
|
|
|
go start(ctrlChannel, errorChannel, s) |
|
|
|
|
return ctrlChannel, errorChannel, s |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func start(ctrlChannel <-chan string, errorChannel chan<- error, s *DonutDriver) { |
|
|
|
|
func start(ctrlChannel <-chan string, errorChannel chan<- error, s *Storage) { |
|
|
|
|
close(errorChannel) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListBuckets returns a list of buckets
|
|
|
|
|
func (donutStorage DonutDriver) ListBuckets() (results []storage.BucketMetadata, err error) { |
|
|
|
|
return nil, errors.New("Not Implemented") |
|
|
|
|
func (donutStorage Storage) ListBuckets() (results []storage.BucketMetadata, err error) { |
|
|
|
|
buckets, err := donutStorage.donut.ListBuckets() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
for _, bucket := range buckets { |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
result := storage.BucketMetadata{ |
|
|
|
|
Name: bucket, |
|
|
|
|
// TODO Add real created date
|
|
|
|
|
Created: time.Now(), |
|
|
|
|
} |
|
|
|
|
results = append(results, result) |
|
|
|
|
} |
|
|
|
|
return results, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CreateBucket creates a new bucket
|
|
|
|
|
func (donutStorage DonutDriver) CreateBucket(bucket string) error { |
|
|
|
|
return errors.New("Not Implemented") |
|
|
|
|
func (donutStorage Storage) CreateBucket(bucket string) error { |
|
|
|
|
return donutStorage.donut.CreateBucket(bucket) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetBucketMetadata retrieves an bucket's metadata
|
|
|
|
|
func (donutStorage DonutDriver) GetBucketMetadata(bucket string) (storage.BucketMetadata, error) { |
|
|
|
|
func (donutStorage Storage) GetBucketMetadata(bucket string) (storage.BucketMetadata, error) { |
|
|
|
|
return storage.BucketMetadata{}, errors.New("Not Implemented") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CreateBucketPolicy sets a bucket's access policy
|
|
|
|
|
func (donutStorage DonutDriver) CreateBucketPolicy(bucket string, p storage.BucketPolicy) error { |
|
|
|
|
func (donutStorage Storage) CreateBucketPolicy(bucket string, p storage.BucketPolicy) error { |
|
|
|
|
return errors.New("Not Implemented") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetBucketPolicy returns a bucket's access policy
|
|
|
|
|
func (donutStorage DonutDriver) GetBucketPolicy(bucket string) (storage.BucketPolicy, error) { |
|
|
|
|
func (donutStorage Storage) GetBucketPolicy(bucket string) (storage.BucketPolicy, error) { |
|
|
|
|
return storage.BucketPolicy{}, errors.New("Not Implemented") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetObject retrieves an object and writes it to a writer
|
|
|
|
|
func (donutStorage DonutDriver) GetObject(target io.Writer, bucket, key string) (int64, error) { |
|
|
|
|
return 0, errors.New("Not Implemented") |
|
|
|
|
func (donutStorage Storage) GetObject(target io.Writer, bucket, key string) (int64, error) { |
|
|
|
|
reader, err := donutStorage.donut.GetObject(bucket, key) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, storage.ObjectNotFound{ |
|
|
|
|
Bucket: bucket, |
|
|
|
|
Object: key, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return io.Copy(target, reader) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetPartialObject retrieves an object and writes it to a writer
|
|
|
|
|
func (donutStorage DonutDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { |
|
|
|
|
func (donutStorage Storage) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { |
|
|
|
|
return 0, errors.New("Not Implemented") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetObjectMetadata retrieves an object's metadata
|
|
|
|
|
func (donutStorage DonutDriver) GetObjectMetadata(bucket, key string, prefix string) (storage.ObjectMetadata, error) { |
|
|
|
|
return storage.ObjectMetadata{}, errors.New("Not Implemented") |
|
|
|
|
func (donutStorage Storage) GetObjectMetadata(bucket, key string, prefix string) (storage.ObjectMetadata, error) { |
|
|
|
|
metadata, err := donutStorage.donut.GetObjectMetadata(bucket, key) |
|
|
|
|
created, err := time.Parse(time.RFC3339Nano, metadata["sys.created"]) |
|
|
|
|
if err != nil { |
|
|
|
|
return storage.ObjectMetadata{}, err |
|
|
|
|
} |
|
|
|
|
size, err := strconv.ParseInt(metadata["sys.size"], 10, 64) |
|
|
|
|
if err != nil { |
|
|
|
|
return storage.ObjectMetadata{}, err |
|
|
|
|
} |
|
|
|
|
objectMetadata := storage.ObjectMetadata{ |
|
|
|
|
Bucket: bucket, |
|
|
|
|
Key: key, |
|
|
|
|
|
|
|
|
|
ContentType: metadata["contentType"], |
|
|
|
|
Created: created, |
|
|
|
|
Md5: metadata["sys.md5"], |
|
|
|
|
Size: size, |
|
|
|
|
} |
|
|
|
|
return objectMetadata, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListObjects lists objects
|
|
|
|
|
func (donutStorage DonutDriver) ListObjects(bucket string, resources storage.BucketResourcesMetadata) ([]storage.ObjectMetadata, storage.BucketResourcesMetadata, error) { |
|
|
|
|
return nil, storage.BucketResourcesMetadata{}, errors.New("Not Implemented") |
|
|
|
|
func (donutStorage Storage) ListObjects(bucket string, resources storage.BucketResourcesMetadata) ([]storage.ObjectMetadata, storage.BucketResourcesMetadata, error) { |
|
|
|
|
// TODO Fix IsPrefixSet && IsDelimiterSet and use them
|
|
|
|
|
objects, err := donutStorage.donut.ListObjects(bucket) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, storage.BucketResourcesMetadata{}, err |
|
|
|
|
} |
|
|
|
|
sort.Strings(objects) |
|
|
|
|
if resources.Prefix != "" { |
|
|
|
|
objects = filterPrefix(objects, resources.Prefix) |
|
|
|
|
objects = removePrefix(objects, resources.Prefix) |
|
|
|
|
} |
|
|
|
|
if resources.Maxkeys <= 0 || resources.Maxkeys > 1000 { |
|
|
|
|
resources.Maxkeys = 1000 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var actualObjects []string |
|
|
|
|
var commonPrefixes []string |
|
|
|
|
if strings.TrimSpace(resources.Delimiter) != "" { |
|
|
|
|
actualObjects = filterDelimited(objects, resources.Delimiter) |
|
|
|
|
commonPrefixes = filterNotDelimited(objects, resources.Delimiter) |
|
|
|
|
commonPrefixes = extractDir(commonPrefixes, resources.Delimiter) |
|
|
|
|
commonPrefixes = uniqueObjects(commonPrefixes) |
|
|
|
|
resources.CommonPrefixes = commonPrefixes |
|
|
|
|
} else { |
|
|
|
|
actualObjects = objects |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var results []storage.ObjectMetadata |
|
|
|
|
for _, object := range actualObjects { |
|
|
|
|
if len(results) >= resources.Maxkeys { |
|
|
|
|
resources.IsTruncated = true |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
metadata, err := donutStorage.GetObjectMetadata(bucket, resources.Prefix+object, "") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, storage.BucketResourcesMetadata{}, err |
|
|
|
|
} |
|
|
|
|
results = append(results, metadata) |
|
|
|
|
} |
|
|
|
|
return results, resources, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func filterPrefix(objects []string, prefix string) []string { |
|
|
|
|
var results []string |
|
|
|
|
for _, object := range objects { |
|
|
|
|
if strings.HasPrefix(object, prefix) { |
|
|
|
|
results = append(results, object) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return results |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func removePrefix(objects []string, prefix string) []string { |
|
|
|
|
var results []string |
|
|
|
|
for _, object := range objects { |
|
|
|
|
results = append(results, strings.TrimPrefix(object, prefix)) |
|
|
|
|
} |
|
|
|
|
return results |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func filterDelimited(objects []string, delim string) []string { |
|
|
|
|
var results []string |
|
|
|
|
for _, object := range objects { |
|
|
|
|
if !strings.Contains(object, delim) { |
|
|
|
|
results = append(results, object) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return results |
|
|
|
|
} |
|
|
|
|
func filterNotDelimited(objects []string, delim string) []string { |
|
|
|
|
var results []string |
|
|
|
|
for _, object := range objects { |
|
|
|
|
if strings.Contains(object, delim) { |
|
|
|
|
results = append(results, object) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return results |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func extractDir(objects []string, delim string) []string { |
|
|
|
|
var results []string |
|
|
|
|
for _, object := range objects { |
|
|
|
|
parts := strings.Split(object, delim) |
|
|
|
|
results = append(results, parts[0]+"/") |
|
|
|
|
} |
|
|
|
|
return results |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func uniqueObjects(objects []string) []string { |
|
|
|
|
objectMap := make(map[string]string) |
|
|
|
|
for _, v := range objects { |
|
|
|
|
objectMap[v] = v |
|
|
|
|
} |
|
|
|
|
var results []string |
|
|
|
|
for k := range objectMap { |
|
|
|
|
results = append(results, k) |
|
|
|
|
} |
|
|
|
|
sort.Strings(results) |
|
|
|
|
return results |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CreateObject creates a new object
|
|
|
|
|
func (donutStorage DonutDriver) CreateObject(bucketKey, objectKey, contentType, md5sum string, reader io.Reader) error { |
|
|
|
|
return errors.New("Not Implemented") |
|
|
|
|
func (donutStorage Storage) CreateObject(bucketKey, objectKey, contentType, expectedMd5sum string, reader io.Reader) error { |
|
|
|
|
writer, err := donutStorage.donut.GetObjectWriter(bucketKey, objectKey) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if _, err := io.Copy(writer, reader); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if contentType == "" { |
|
|
|
|
contentType = "application/octet-stream" |
|
|
|
|
} |
|
|
|
|
contentType = strings.TrimSpace(contentType) |
|
|
|
|
metadata := make(map[string]string) |
|
|
|
|
metadata["bucket"] = bucketKey |
|
|
|
|
metadata["object"] = objectKey |
|
|
|
|
metadata["contentType"] = contentType |
|
|
|
|
if err = writer.SetMetadata(metadata); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return writer.Close() |
|
|
|
|
} |
|
|
|
|