Integrate cache with donut, add tests

master
Harshavardhana 10 years ago
parent 0533abf6a8
commit bce93c1b3a
  1. 24
      pkg/storage/donut/cache-multipart.go
  2. 55
      pkg/storage/donut/cache.go
  3. 260
      pkg/storage/donut/donut_test.go
  4. 10
      pkg/storage/donut/utils.go

@ -217,23 +217,23 @@ func (cache Cache) cleanupMultiparts(bucket, key, uploadID string) {
}
// CompleteMultipartUpload -
func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) {
if !IsValidBucket(bucket) {
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
cache.lock.RLock()
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
cache.lock.RUnlock()
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
cache.lock.RUnlock()
@ -245,21 +245,21 @@ func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts m
object, ok := cache.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i))
if ok == false {
cache.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
size += int64(len(object))
calcMD5Bytes := md5.Sum(object)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
return "", iodine.New(InvalidDigest{Md5: recvMD5}, nil)
return ObjectMetadata{}, iodine.New(InvalidDigest{Md5: recvMD5}, nil)
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return "", iodine.New(BadDigest{}, nil)
return ObjectMetadata{}, iodine.New(BadDigest{}, nil)
}
_, err = io.Copy(&fullObject, bytes.NewBuffer(object))
if err != nil {
return "", iodine.New(err, nil)
return ObjectMetadata{}, iodine.New(err, nil)
}
object = nil
go debug.FreeOSMemory()
@ -269,16 +269,16 @@ func (cache Cache) CompleteMultipartUpload(bucket, key, uploadID string, parts m
md5sumSlice := md5.Sum(fullObject.Bytes())
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
etag, err := cache.CreateObject(bucket, key, "", md5sum, size, &fullObject)
objectMetadata, err := cache.CreateObject(bucket, key, "", md5sum, size, &fullObject)
if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return "", iodine.New(err, nil)
return ObjectMetadata{}, iodine.New(err, nil)
}
fullObject.Reset()
cache.cleanupMultiparts(bucket, key, uploadID)
cache.cleanupMultipartSession(bucket, key, uploadID)
return etag, nil
return objectMetadata, nil
}
// byKey is a sortable interface for UploadMetadata slice

@ -93,6 +93,9 @@ func NewCache(maxSize uint64, expiration time.Duration, donutName string, nodeDi
c.multiPartObjects = trove.NewCache(0, time.Duration(0))
c.objects.OnExpired = c.expiredObject
c.multiPartObjects.OnExpired = c.expiredPart
c.lock = new(sync.RWMutex)
c.maxSize = maxSize
c.expiration = expiration
// set up cache expiration
c.objects.ExpireObjects(time.Second * 5)
@ -262,42 +265,42 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
}
// CreateObject -
func (cache Cache) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
func (cache Cache) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) {
if size > int64(cache.maxSize) {
generic := GenericObjectError{Bucket: bucket, Object: key}
return "", iodine.New(EntityTooLarge{
return ObjectMetadata{}, iodine.New(EntityTooLarge{
GenericObjectError: generic,
Size: strconv.FormatInt(size, 10),
MaxSize: strconv.FormatUint(cache.maxSize, 10),
}, nil)
}
md5sum, err := cache.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
objectMetadata, err := cache.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
// free
debug.FreeOSMemory()
return md5sum, iodine.New(err, nil)
return objectMetadata, iodine.New(err, nil)
}
// createObject - PUT object to cache buffer
func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) {
cache.lock.RLock()
if !IsValidBucket(bucket) {
cache.lock.RUnlock()
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
cache.lock.RUnlock()
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := cache.storedBuckets[bucket]
// get object key
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
cache.lock.RUnlock()
return "", iodine.New(ObjectExists{Object: key}, nil)
return ObjectMetadata{}, iodine.New(ObjectExists{Object: key}, nil)
}
cache.lock.RUnlock()
@ -309,7 +312,7 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", iodine.New(InvalidDigest{Md5: expectedMD5Sum}, nil)
return ObjectMetadata{}, iodine.New(InvalidDigest{Md5: expectedMD5Sum}, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
@ -332,7 +335,7 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
readBytes = append(readBytes, byteBuffer[0:length]...)
}
if err != io.EOF {
return "", iodine.New(err, nil)
return ObjectMetadata{}, iodine.New(err, nil)
}
md5SumBytes := hash.Sum(nil)
totalLength := len(readBytes)
@ -344,14 +347,14 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
go debug.FreeOSMemory()
cache.lock.Unlock()
if !ok {
return "", iodine.New(InternalError{}, nil)
return ObjectMetadata{}, iodine.New(InternalError{}, nil)
}
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
return "", iodine.New(BadDigest{}, nil)
return ObjectMetadata{}, iodine.New(BadDigest{}, nil)
}
}
@ -371,11 +374,11 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
storedBucket.objectMetadata[objectKey] = newObject
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
return newObject.MD5Sum, nil
return newObject, nil
}
// CreateBucket - create bucket in cache
func (cache Cache) CreateBucket(bucketName, acl string) error {
// MakeBucket - create bucket in cache
func (cache Cache) MakeBucket(bucketName, acl string) error {
cache.lock.RLock()
if len(cache.storedBuckets) == totalBuckets {
cache.lock.RUnlock()
@ -418,22 +421,21 @@ func (cache Cache) CreateBucket(bucketName, acl string) error {
return nil
}
func (cache Cache) filterDelimiterPrefix(keys []string, key, prefix, delim string) ([]string, []string) {
var commonPrefixes []string
func (cache Cache) filterDelimiterPrefix(keys []string, commonPrefixes []string, key, prefix, delim string) ([]string, []string) {
switch true {
case key == prefix:
keys = append(keys, key)
// delim - requires r.Prefix as it was trimmed off earlier
case key == prefix+delim:
keys = append(keys, key)
fallthrough
case delim != "":
commonPrefixes = append(commonPrefixes, prefix+delim)
}
return RemoveDuplicates(keys), RemoveDuplicates(commonPrefixes)
return keys, commonPrefixes
}
func (cache Cache) listObjects(keys []string, key string, r BucketResourcesMetadata) ([]string, []string) {
var commonPrefixes []string
func (cache Cache) listObjects(keys []string, commonPrefixes []string, key string, r BucketResourcesMetadata) ([]string, []string) {
switch true {
// Prefix absent, delimit object key based on delimiter
case r.IsDelimiterSet():
@ -449,7 +451,7 @@ func (cache Cache) listObjects(keys []string, key string, r BucketResourcesMetad
if strings.HasPrefix(key, r.Prefix) {
trimmedName := strings.TrimPrefix(key, r.Prefix)
delim := Delimiter(trimmedName, r.Delimiter)
keys, commonPrefixes = cache.filterDelimiterPrefix(keys, key, r.Prefix, delim)
keys, commonPrefixes = cache.filterDelimiterPrefix(keys, commonPrefixes, key, r.Prefix, delim)
}
// Prefix present, nothing to delimit
case r.IsPrefixSet():
@ -458,7 +460,7 @@ func (cache Cache) listObjects(keys []string, key string, r BucketResourcesMetad
case r.IsDefault():
keys = append(keys, key)
}
return RemoveDuplicates(keys), RemoveDuplicates(commonPrefixes)
return keys, commonPrefixes
}
// ListObjects - list objects from cache
@ -468,7 +470,7 @@ func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata)
if !IsValidBucket(bucket) {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(resources.Prefix) {
if !IsValidPrefix(resources.Prefix) {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(ObjectNameInvalid{Object: resources.Prefix}, nil)
}
if _, ok := cache.storedBuckets[bucket]; ok == false {
@ -476,12 +478,11 @@ func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata)
}
var results []ObjectMetadata
var keys []string
var commonPrefixes []string
storedBucket := cache.storedBuckets[bucket]
for key := range storedBucket.objectMetadata {
if strings.HasPrefix(key, bucket+"/") {
key = key[len(bucket)+1:]
keys, commonPrefixes = cache.listObjects(keys, key, resources)
keys, resources.CommonPrefixes = cache.listObjects(keys, resources.CommonPrefixes, key, resources)
}
}
var newKeys []string
@ -508,7 +509,7 @@ func (cache Cache) ListObjects(bucket string, resources BucketResourcesMetadata)
object := storedBucket.objectMetadata[bucket+"/"+key]
results = append(results, object)
}
resources.CommonPrefixes = commonPrefixes
resources.CommonPrefixes = RemoveDuplicates(resources.CommonPrefixes)
return results, resources, nil
}

@ -19,20 +19,23 @@ package donut
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"
. "github.com/minio/check"
)
func Test(t *testing.T) { TestingT(t) }
type MySuite struct{}
type MySuite struct {
root string
}
var _ = Suite(&MySuite{})
@ -52,293 +55,224 @@ func createTestNodeDiskMap(p string) map[string][]string {
return nodes
}
// test empty donut
func (s *MySuite) TestEmptyDonut(c *C) {
var d Cache
func (s *MySuite) SetUpSuite(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
s.root = root
d = NewCache(100000, time.Duration(1*time.Hour), "test", createTestNodeDiskMap(root))
buckets, err := d.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 0)
}
// check donut is empty
metadata, err := donut.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(metadata), Equals, 0)
func (s *MySuite) TearDownSuite(c *C) {
os.RemoveAll(s.root)
}
// test make bucket without name
func (s *MySuite) TestBucketWithoutNameFails(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
// fail to create new bucket without a name
err = donut.MakeBucket("", "private")
err := d.MakeBucket("", "private")
c.Assert(err, Not(IsNil))
err = donut.MakeBucket(" ", "private")
err = d.MakeBucket(" ", "private")
c.Assert(err, Not(IsNil))
}
// test empty bucket
func (s *MySuite) TestEmptyBucket(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
c.Assert(donut.MakeBucket("foo", BucketACL("private")), IsNil)
c.Assert(d.MakeBucket("foo1", "private"), IsNil)
// check if bucket is empty
listObjects, err := donut.ListObjects("foo", "", "", "", 1)
var resources BucketResourcesMetadata
resources.Maxkeys = 1
objectsMetadata, resources, err := d.ListObjects("foo1", resources)
c.Assert(err, IsNil)
c.Assert(len(listObjects.Objects), Equals, 0)
c.Assert(listObjects.CommonPrefixes, DeepEquals, []string{})
c.Assert(listObjects.IsTruncated, Equals, false)
c.Assert(len(objectsMetadata), Equals, 0)
c.Assert(resources.CommonPrefixes, DeepEquals, []string{})
c.Assert(resources.IsTruncated, Equals, false)
}
// test bucket list
func (s *MySuite) TestMakeBucketAndList(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
// create bucket
err = donut.MakeBucket("foo", BucketACL("private"))
err := d.MakeBucket("foo2", "private")
c.Assert(err, IsNil)
// check bucket exists
buckets, err := donut.ListBuckets()
buckets, err := d.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 1)
c.Assert(buckets["foo"].ACL, Equals, BucketACL("private"))
c.Assert(len(buckets), Equals, 5)
c.Assert(buckets[0].ACL, Equals, BucketACL("private"))
}
// test re-create bucket
func (s *MySuite) TestMakeBucketWithSameNameFails(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
err = donut.MakeBucket("foo", BucketACL("private"))
err := d.MakeBucket("foo3", "private")
c.Assert(err, IsNil)
err = donut.MakeBucket("foo", BucketACL("private"))
err = d.MakeBucket("foo3", "private")
c.Assert(err, Not(IsNil))
}
// test make multiple buckets
func (s *MySuite) TestCreateMultipleBucketsAndList(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
// add a second bucket
err = donut.MakeBucket("foo", BucketACL("private"))
err := d.MakeBucket("foo4", "private")
c.Assert(err, IsNil)
err = donut.MakeBucket("bar", BucketACL("private"))
err = d.MakeBucket("bar1", "private")
c.Assert(err, IsNil)
buckets, err := donut.ListBuckets()
buckets, err := d.ListBuckets()
c.Assert(err, IsNil)
_, ok := buckets["foo"]
c.Assert(ok, Equals, true)
_, ok = buckets["bar"]
c.Assert(ok, Equals, true)
c.Assert(len(buckets), Equals, 2)
c.Assert(buckets[0].Name, Equals, "bar1")
c.Assert(buckets[1].Name, Equals, "foo4")
err = donut.MakeBucket("foobar", BucketACL("private"))
err = d.MakeBucket("foobar1", "private")
c.Assert(err, IsNil)
buckets, err = donut.ListBuckets()
buckets, err = d.ListBuckets()
c.Assert(err, IsNil)
_, ok = buckets["foobar"]
c.Assert(ok, Equals, true)
c.Assert(len(buckets), Equals, 3)
c.Assert(buckets[2].Name, Equals, "foobar1")
}
// test object create without bucket
func (s *MySuite) TestNewObjectFailsWithoutBucket(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
_, err = donut.PutObject("foo", "obj", "", nil, nil)
_, err := d.CreateObject("unknown", "obj", "", "", 0, nil)
c.Assert(err, Not(IsNil))
}
// test create object metadata
func (s *MySuite) TestNewObjectMetadata(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
metadata := make(map[string]string)
metadata["contentType"] = "application/json"
metadata["foo"] = "value1"
metadata["hello"] = "world"
data := "Hello World"
hasher := md5.New()
hasher.Write([]byte(data))
expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil))
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
metadata["contentLength"] = strconv.Itoa(len(data))
err = donut.MakeBucket("foo", "private")
err := d.MakeBucket("foo6", "private")
c.Assert(err, IsNil)
objectMetadata, err := donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
objectMetadata, err := d.CreateObject("foo6", "obj", "application/json", expectedMd5Sum, int64(len(data)), reader)
c.Assert(err, IsNil)
c.Assert(objectMetadata.MD5Sum, Equals, expectedMd5Sum)
c.Assert(objectMetadata.Metadata["contentType"], Equals, metadata["contentType"])
c.Assert(objectMetadata.Metadata["foo"], Equals, metadata["foo"])
c.Assert(objectMetadata.Metadata["hello"], Equals, metadata["hello"])
c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json")
}
// test create object fails without name
func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
_, err = donut.PutObject("foo", "", "", nil, nil)
_, err := d.CreateObject("foo", "", "", "", 0, nil)
c.Assert(err, Not(IsNil))
}
// test create object
func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
err = donut.MakeBucket("foo", "private")
err := d.MakeBucket("foo", "private")
c.Assert(err, IsNil)
metadata := make(map[string]string)
metadata["contentType"] = "application/octet-stream"
data := "Hello World"
hasher := md5.New()
hasher.Write([]byte(data))
expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil))
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
metadata["contentLength"] = strconv.Itoa(len(data))
actualMetadata, err := donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
actualMetadata, err := d.CreateObject("foo", "obj", "application/octet-stream", expectedMd5Sum, int64(len(data)), reader)
c.Assert(err, IsNil)
c.Assert(actualMetadata.MD5Sum, Equals, expectedMd5Sum)
c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
reader, size, err := donut.GetObject("foo", "obj")
var buffer bytes.Buffer
size, err := d.GetObject(&buffer, "foo", "obj")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len(data)))
c.Assert(buffer.Bytes(), DeepEquals, []byte(data))
var actualData bytes.Buffer
_, err = io.Copy(&actualData, reader)
actualMetadata, err = d.GetObjectMetadata("foo", "obj")
c.Assert(err, IsNil)
c.Assert(actualData.Bytes(), DeepEquals, []byte(data))
actualMetadata, err = donut.GetObjectMetadata("foo", "obj")
c.Assert(err, IsNil)
c.Assert(expectedMd5Sum, Equals, actualMetadata.MD5Sum)
c.Assert(hex.EncodeToString(hasher.Sum(nil)), Equals, actualMetadata.MD5Sum)
c.Assert(int64(len(data)), Equals, actualMetadata.Size)
c.Assert("1.0.0", Equals, actualMetadata.Version)
}
// test list objects
func (s *MySuite) TestMultipleNewObjects(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
c.Assert(donut.MakeBucket("foo", BucketACL("private")), IsNil)
c.Assert(d.MakeBucket("foo5", "private"), IsNil)
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
metadata := make(map[string]string)
metadata["contentLength"] = strconv.Itoa(len("one"))
_, err = donut.PutObject("foo", "obj1", "", one, metadata)
_, err := d.CreateObject("foo5", "obj1", "", "", int64(len("one")), one)
c.Assert(err, IsNil)
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
metadata["contentLength"] = strconv.Itoa(len("two"))
_, err = donut.PutObject("foo", "obj2", "", two, metadata)
_, err = d.CreateObject("foo5", "obj2", "", "", int64(len("two")), two)
c.Assert(err, IsNil)
obj1, size, err := donut.GetObject("foo", "obj1")
var buffer1 bytes.Buffer
size, err := d.GetObject(&buffer1, "foo5", "obj1")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("one"))))
c.Assert(buffer1.Bytes(), DeepEquals, []byte("one"))
var readerBuffer1 bytes.Buffer
_, err = io.CopyN(&readerBuffer1, obj1, size)
c.Assert(err, IsNil)
c.Assert(readerBuffer1.Bytes(), DeepEquals, []byte("one"))
obj2, size, err := donut.GetObject("foo", "obj2")
var buffer2 bytes.Buffer
size, err = d.GetObject(&buffer2, "foo5", "obj2")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("two"))))
var readerBuffer2 bytes.Buffer
_, err = io.CopyN(&readerBuffer2, obj2, size)
c.Assert(err, IsNil)
c.Assert(readerBuffer2.Bytes(), DeepEquals, []byte("two"))
c.Assert(buffer2.Bytes(), DeepEquals, []byte("two"))
/// test list of objects
// test list objects with prefix and delimiter
listObjects, err := donut.ListObjects("foo", "o", "", "1", 10)
var resources BucketResourcesMetadata
resources.Prefix = "o"
resources.Delimiter = "1"
resources.Maxkeys = 10
objectsMetadata, resources, err := d.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(listObjects.IsTruncated, Equals, false)
c.Assert(listObjects.CommonPrefixes[0], Equals, "obj1")
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(resources.CommonPrefixes[0], Equals, "obj1")
// test list objects with only delimiter
listObjects, err = donut.ListObjects("foo", "", "", "1", 10)
resources.Prefix = ""
resources.Delimiter = "1"
resources.Maxkeys = 10
objectsMetadata, resources, err = d.ListObjects("foo5", resources)
c.Assert(err, IsNil)
_, ok := listObjects.Objects["obj2"]
c.Assert(ok, Equals, true)
c.Assert(listObjects.IsTruncated, Equals, false)
c.Assert(listObjects.CommonPrefixes[0], Equals, "obj1")
c.Assert(objectsMetadata[0].Object, Equals, "obj1")
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(resources.CommonPrefixes[0], Equals, "obj1")
// test list objects with only prefix
listObjects, err = donut.ListObjects("foo", "o", "", "", 10)
resources.Prefix = "o"
resources.Delimiter = ""
resources.Maxkeys = 10
objectsMetadata, resources, err = d.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(listObjects.IsTruncated, Equals, false)
_, ok1 := listObjects.Objects["obj1"]
_, ok2 := listObjects.Objects["obj2"]
c.Assert(ok1, Equals, true)
c.Assert(ok2, Equals, true)
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(objectsMetadata[0].Object, Equals, "obj1")
c.Assert(objectsMetadata[1].Object, Equals, "obj2")
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
metadata["contentLength"] = strconv.Itoa(len("three"))
_, err = donut.PutObject("foo", "obj3", "", three, metadata)
_, err = d.CreateObject("foo5", "obj3", "", "", int64(len("three")), three)
c.Assert(err, IsNil)
obj3, size, err := donut.GetObject("foo", "obj3")
var buffer bytes.Buffer
size, err = d.GetObject(&buffer, "foo5", "obj3")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("three"))))
var readerBuffer3 bytes.Buffer
_, err = io.CopyN(&readerBuffer3, obj3, size)
c.Assert(err, IsNil)
c.Assert(readerBuffer3.Bytes(), DeepEquals, []byte("three"))
c.Assert(buffer.Bytes(), DeepEquals, []byte("three"))
// test list objects with maxkeys
listObjects, err = donut.ListObjects("foo", "o", "", "", 2)
resources.Prefix = "o"
resources.Delimiter = ""
resources.Maxkeys = 2
objectsMetadata, resources, err = d.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(listObjects.IsTruncated, Equals, true)
c.Assert(len(listObjects.Objects), Equals, 2)
c.Assert(resources.IsTruncated, Equals, true)
c.Assert(len(objectsMetadata), Equals, 2)
}

@ -181,7 +181,7 @@ func IsValidBucket(bucket string) bool {
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
func IsValidObjectName(object string) bool {
if strings.TrimSpace(object) == "" {
return true
return false
}
if len(object) > 1024 || len(object) == 0 {
return false
@ -191,3 +191,11 @@ func IsValidObjectName(object string) bool {
}
return true
}
// IsValidPrefix - verify prefix name is correct, an empty prefix is valid
func IsValidPrefix(prefix string) bool {
if strings.TrimSpace(prefix) == "" {
return true
}
return IsValidObjectName(prefix)
}

Loading…
Cancel
Save