You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
246 lines
6.5 KiB
246 lines
6.5 KiB
package donutmem
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"github.com/minio-io/minio/pkg/donutbox"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type bucket struct {
|
|
name string
|
|
metadata map[string]string
|
|
objects map[string]*object
|
|
lock *sync.RWMutex
|
|
}
|
|
|
|
type object struct {
|
|
name string
|
|
data []byte
|
|
metadata map[string]string
|
|
lock *sync.RWMutex
|
|
}
|
|
|
|
type donutMem struct {
|
|
buckets map[string]*bucket
|
|
lock *sync.RWMutex
|
|
}
|
|
|
|
// NewDonutMem creates a new in memory donut
|
|
func NewDonutMem() donutbox.DonutBox {
|
|
return donutMem{
|
|
buckets: make(map[string]*bucket),
|
|
lock: new(sync.RWMutex),
|
|
}
|
|
}
|
|
|
|
// system operations
|
|
func (donutMem donutMem) ListBuckets() ([]string, error) {
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
var buckets []string
|
|
for k := range donutMem.buckets {
|
|
buckets = append(buckets, k)
|
|
}
|
|
return buckets, nil
|
|
}
|
|
|
|
// bucket operations
|
|
func (donutMem donutMem) CreateBucket(b string) error {
|
|
donutMem.lock.Lock()
|
|
defer donutMem.lock.Unlock()
|
|
b = strings.ToLower(b)
|
|
if _, ok := donutMem.buckets[b]; ok {
|
|
return errors.New("Bucket Exists")
|
|
}
|
|
metadata := make(map[string]string)
|
|
metadata["name"] = b
|
|
metadata["created"] = time.Now().Format(time.RFC3339Nano)
|
|
newBucket := bucket{
|
|
name: b,
|
|
metadata: metadata,
|
|
objects: make(map[string]*object),
|
|
lock: new(sync.RWMutex),
|
|
}
|
|
donutMem.buckets[b] = &newBucket
|
|
return nil
|
|
}
|
|
|
|
func (donutMem donutMem) ListObjectsInBucket(bucketKey, prefixKey string) ([]string, error) {
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
|
|
curBucket.lock.RLock()
|
|
defer curBucket.lock.RUnlock()
|
|
objectMap := make(map[string]string)
|
|
for objectKey := range curBucket.objects {
|
|
objectName := strings.Split(objectKey, "#")[0]
|
|
if strings.HasPrefix(objectName, prefixKey) {
|
|
objectMap[objectName] = objectName
|
|
}
|
|
}
|
|
var objects []string
|
|
for k := range objectMap {
|
|
objects = append(objects, k)
|
|
}
|
|
return objects, nil
|
|
}
|
|
return nil, errors.New("Bucket does not exist")
|
|
}
|
|
|
|
func (donutMem donutMem) GetBucketMetadata(bucketKey string) (map[string]string, error) {
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
|
|
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
|
|
curBucket.lock.RLock()
|
|
defer curBucket.lock.RUnlock()
|
|
result := make(map[string]string)
|
|
for k, v := range curBucket.metadata {
|
|
result[k] = v
|
|
}
|
|
return result, nil
|
|
}
|
|
return nil, errors.New("Bucket not found")
|
|
}
|
|
|
|
func (donutMem donutMem) SetBucketMetadata(bucketKey string, metadata map[string]string) error {
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
|
|
curBucket.lock.Lock()
|
|
defer curBucket.lock.Unlock()
|
|
newMetadata := make(map[string]string)
|
|
for k, v := range metadata {
|
|
newMetadata[k] = v
|
|
}
|
|
curBucket.metadata = newMetadata
|
|
return nil
|
|
}
|
|
return errors.New("Bucket not found")
|
|
}
|
|
|
|
// object operations
|
|
func (donutMem donutMem) GetObjectWriter(bucketKey, objectKey string, column uint) (*donutbox.NewObject, error) {
|
|
key := getKey(bucketKey, objectKey, column)
|
|
reader, writer := io.Pipe()
|
|
returnObject := donutbox.CreateNewObject(writer)
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
|
|
curBucket.lock.Lock()
|
|
defer curBucket.lock.Unlock()
|
|
if _, ok := curBucket.objects[key]; !ok {
|
|
newObject := object{
|
|
name: key,
|
|
data: make([]byte, 0),
|
|
lock: new(sync.RWMutex),
|
|
}
|
|
|
|
newObject.lock.Lock()
|
|
curBucket.objects[key] = &newObject
|
|
go func() {
|
|
defer newObject.lock.Unlock()
|
|
var objBuffer bytes.Buffer
|
|
|
|
_, err := io.Copy(&objBuffer, reader)
|
|
if err == nil {
|
|
newObject.data = objBuffer.Bytes()
|
|
writer.Close()
|
|
|
|
metadata := returnObject.GetMetadata()
|
|
for k, v := range metadata {
|
|
metadata[k] = v
|
|
}
|
|
metadata["key"] = objectKey
|
|
metadata["column"] = strconv.FormatUint(uint64(column), 10)
|
|
newObject.metadata = metadata
|
|
|
|
return
|
|
}
|
|
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
bucket, _ := donutMem.buckets[bucketKey]
|
|
bucket.lock.Lock()
|
|
defer bucket.lock.Unlock()
|
|
delete(bucket.objects, key)
|
|
writer.CloseWithError(err)
|
|
}()
|
|
return returnObject, nil
|
|
}
|
|
writer.CloseWithError(errors.New("Object exists"))
|
|
return nil, errors.New("Object exists")
|
|
}
|
|
writer.CloseWithError(errors.New("Bucket does not exist"))
|
|
return nil, errors.New("Bucket does not exist")
|
|
}
|
|
|
|
func (donutMem donutMem) GetObjectReader(bucketKey, objectKey string, column uint) (io.Reader, error) {
|
|
key := getKey(bucketKey, objectKey, column)
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
|
|
curBucket.lock.RLock()
|
|
defer curBucket.lock.RUnlock()
|
|
if curObject, ok := curBucket.objects[key]; ok {
|
|
curObject.lock.RLock()
|
|
defer curObject.lock.RUnlock()
|
|
return bytes.NewBuffer(curObject.data), nil
|
|
}
|
|
return nil, errors.New("Object not found")
|
|
}
|
|
return nil, errors.New("Bucket not found")
|
|
}
|
|
|
|
//func (donutMem donutMem) SetObjectMetadata(bucketKey, objectKey string, column uint, metadata map[string]string) error {
|
|
// key := getKey(bucketKey, objectKey, column)
|
|
// donutMem.lock.RLock()
|
|
// defer donutMem.lock.RUnlock()
|
|
// if curBucket, ok := donutMem.buckets[bucketKey]; ok {
|
|
// curBucket.lock.RLock()
|
|
// defer curBucket.lock.RUnlock()
|
|
// if curObject, ok := curBucket.objects[key]; ok {
|
|
// curObject.lock.Lock()
|
|
// defer curObject.lock.Unlock()
|
|
// newMetadata := make(map[string]string)
|
|
// for k, v := range metadata {
|
|
// newMetadata[k] = v
|
|
// }
|
|
// curObject.metadata = newMetadata
|
|
// return nil
|
|
// }
|
|
// return errors.New("Object not found")
|
|
// }
|
|
// return errors.New("Bucket not found")
|
|
//}
|
|
|
|
func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string, column uint) (map[string]string, error) {
|
|
key := getKey(bucketKey, objectKey, column)
|
|
donutMem.lock.RLock()
|
|
defer donutMem.lock.RUnlock()
|
|
|
|
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
|
|
curBucket.lock.RLock()
|
|
defer curBucket.lock.RUnlock()
|
|
if curObject, ok := curBucket.objects[key]; ok {
|
|
curObject.lock.RLock()
|
|
defer curObject.lock.RUnlock()
|
|
result := make(map[string]string)
|
|
for k, v := range curObject.metadata {
|
|
result[k] = v
|
|
}
|
|
return result, nil
|
|
}
|
|
return nil, errors.New("Object not Found: " + bucketKey + "#" + objectKey)
|
|
}
|
|
return nil, errors.New("Bucket not found")
|
|
}
|
|
|
|
func getKey(bucketKey, objectKey string, column uint) string {
|
|
return objectKey + "#" + strconv.FormatUint(uint64(column), 10)
|
|
}
|
|
|