Add delimiter and prefix tests, also add inmemory to support delimiters

master
Harshavardhana 10 years ago
parent f907068798
commit 8866b2cc5f
  1. 73
      pkg/storage/inmemory/inmemory.go
  2. 45
      pkg/storage/storage_api_suite.go

@ -17,6 +17,7 @@
package inmemory package inmemory
import ( import (
"bufio"
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
@ -66,9 +67,8 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error) {
// CopyObjectToWriter - GET object from memory buffer // CopyObjectToWriter - GET object from memory buffer
func (storage *Storage) CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error) { func (storage *Storage) CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error) {
// TODO synchronize access
// get object // get object
key := bucket + ":" + object key := object
if val, ok := storage.objectdata[key]; ok { if val, ok := storage.objectdata[key]; ok {
objectBuffer := bytes.NewBuffer(val.data) objectBuffer := bytes.NewBuffer(val.data)
written, err := io.Copy(w, objectBuffer) written, err := io.Copy(w, objectBuffer)
@ -92,13 +92,11 @@ func (storage *Storage) StoreObject(bucket, key, contentType string, data io.Rea
storage.lock.Lock() storage.lock.Lock()
defer storage.lock.Unlock() defer storage.lock.Unlock()
objectKey := bucket + ":" + key
if _, ok := storage.bucketdata[bucket]; ok == false { if _, ok := storage.bucketdata[bucket]; ok == false {
return mstorage.BucketNotFound{Bucket: bucket} return mstorage.BucketNotFound{Bucket: bucket}
} }
if _, ok := storage.objectdata[objectKey]; ok == true { if _, ok := storage.objectdata[key]; ok == true {
return mstorage.ObjectExists{Bucket: bucket, Object: key} return mstorage.ObjectExists{Bucket: bucket, Object: key}
} }
@ -124,7 +122,7 @@ func (storage *Storage) StoreObject(bucket, key, contentType string, data io.Rea
} }
newObject.data = bytesBuffer.Bytes() newObject.data = bytesBuffer.Bytes()
} }
storage.objectdata[objectKey] = newObject storage.objectdata[key] = newObject
return nil return nil
} }
@ -149,6 +147,24 @@ func (storage *Storage) StoreBucket(bucketName string) error {
return nil return nil
} }
func delimiter(object, delimiter string) string {
readBuffer := bytes.NewBufferString(object)
reader := bufio.NewReader(readBuffer)
stringReader := strings.NewReader(delimiter)
delimited, _ := stringReader.ReadByte()
delimitedStr, _ := reader.ReadString(delimited)
return delimitedStr
}
func appendUniq(slice []string, i string) []string {
for _, ele := range slice {
if ele == i {
return slice
}
}
return append(slice, i)
}
// ListObjects - list objects from memory // ListObjects - list objects from memory
func (storage *Storage) ListObjects(bucket string, resources mstorage.BucketResourcesMetadata) ([]mstorage.ObjectMetadata, mstorage.BucketResourcesMetadata, error) { func (storage *Storage) ListObjects(bucket string, resources mstorage.BucketResourcesMetadata) ([]mstorage.ObjectMetadata, mstorage.BucketResourcesMetadata, error) {
if _, ok := storage.bucketdata[bucket]; ok == false { if _, ok := storage.bucketdata[bucket]; ok == false {
@ -157,10 +173,43 @@ func (storage *Storage) ListObjects(bucket string, resources mstorage.BucketReso
var results []mstorage.ObjectMetadata var results []mstorage.ObjectMetadata
var keys []string var keys []string
for key := range storage.objectdata { for key := range storage.objectdata {
if strings.HasPrefix(key, bucket+":"+resources.Prefix) { switch true {
keys = append(keys, key) // Prefix absent, delimit object key based on delimiter
case resources.Delimiter != "" && resources.Prefix == "":
delimitedName := delimiter(key, resources.Delimiter)
switch true {
case delimitedName == "" || delimitedName == key:
keys = appendUniq(keys, key)
case delimitedName != "":
resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, delimitedName)
}
// Prefix present, delimit object key with prefix key based on delimiter
case resources.Delimiter != "" && resources.Prefix != "" && strings.HasPrefix(key, resources.Prefix):
trimmedName := strings.TrimPrefix(key, resources.Prefix)
delimitedName := delimiter(trimmedName, resources.Delimiter)
fmt.Println(trimmedName, delimitedName, key, resources.Prefix)
switch true {
case key == resources.Prefix:
keys = appendUniq(keys, key)
// DelimitedName - requires resources.Prefix as it was trimmed off earlier in the flow
case key == resources.Prefix+delimitedName:
keys = appendUniq(keys, key)
case delimitedName != "":
if delimitedName == resources.Delimiter {
resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, resources.Prefix+delimitedName)
} else {
resources.CommonPrefixes = appendUniq(resources.CommonPrefixes, delimitedName)
}
}
// Prefix present, nothing to delimit
case resources.Delimiter == "" && resources.Prefix != "" && strings.HasPrefix(key, resources.Prefix):
keys = appendUniq(keys, key)
// Prefix and delimiter absent
case resources.Prefix == "" && resources.Delimiter == "":
keys = appendUniq(keys, key)
} }
} }
sort.Strings(keys) sort.Strings(keys)
for _, key := range keys { for _, key := range keys {
if len(results) == resources.Maxkeys { if len(results) == resources.Maxkeys {
@ -168,9 +217,7 @@ func (storage *Storage) ListObjects(bucket string, resources mstorage.BucketReso
} }
object := storage.objectdata[key] object := storage.objectdata[key]
if bucket == object.metadata.Bucket { if bucket == object.metadata.Bucket {
if strings.HasPrefix(key, bucket+":"+resources.Prefix) { results = append(results, object.metadata)
results = append(results, object.metadata)
}
} }
} }
return results, resources, nil return results, resources, nil
@ -199,9 +246,7 @@ func (storage *Storage) ListBuckets() ([]mstorage.BucketMetadata, error) {
// GetObjectMetadata - get object metadata from memory // GetObjectMetadata - get object metadata from memory
func (storage *Storage) GetObjectMetadata(bucket, key, prefix string) (mstorage.ObjectMetadata, error) { func (storage *Storage) GetObjectMetadata(bucket, key, prefix string) (mstorage.ObjectMetadata, error) {
objectKey := bucket + ":" + key if object, ok := storage.objectdata[key]; ok == true {
if object, ok := storage.objectdata[objectKey]; ok == true {
return object.metadata, nil return object.metadata, nil
} }
return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: key} return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: key}

@ -87,7 +87,7 @@ func testPaging(c *check.C, create func() Storage) {
c.Assert(len(objects), check.Equals, 0) c.Assert(len(objects), check.Equals, 0)
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(resources.IsTruncated, check.Equals, false)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
// checheck before paging occurs // check before paging occurs
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
storage.StoreObject("bucket", key, "", bytes.NewBufferString(key)) storage.StoreObject("bucket", key, "", bytes.NewBufferString(key))
@ -97,7 +97,7 @@ func testPaging(c *check.C, create func() Storage) {
c.Assert(resources.IsTruncated, check.Equals, false) c.Assert(resources.IsTruncated, check.Equals, false)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
} }
// checheck after paging occurs pages work // check after paging occurs pages work
for i := 6; i <= 10; i++ { for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i) key := "obj" + strconv.Itoa(i)
storage.StoreObject("bucket", key, "", bytes.NewBufferString(key)) storage.StoreObject("bucket", key, "", bytes.NewBufferString(key))
@ -107,7 +107,7 @@ func testPaging(c *check.C, create func() Storage) {
c.Assert(resources.IsTruncated, check.Equals, true) c.Assert(resources.IsTruncated, check.Equals, true)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
} }
// checheck paging with prefix at end returns less objects // check paging with prefix at end returns less objects
{ {
storage.StoreObject("bucket", "newPrefix", "", bytes.NewBufferString("prefix1")) storage.StoreObject("bucket", "newPrefix", "", bytes.NewBufferString("prefix1"))
storage.StoreObject("bucket", "newPrefix2", "", bytes.NewBufferString("prefix2")) storage.StoreObject("bucket", "newPrefix2", "", bytes.NewBufferString("prefix2"))
@ -117,7 +117,7 @@ func testPaging(c *check.C, create func() Storage) {
c.Assert(len(objects), check.Equals, 2) c.Assert(len(objects), check.Equals, 2)
} }
// checheck ordering of pages // check ordering of pages
{ {
resources.Prefix = "" resources.Prefix = ""
resources.Maxkeys = 1000 resources.Maxkeys = 1000
@ -128,19 +128,50 @@ func testPaging(c *check.C, create func() Storage) {
c.Assert(objects[3].Key, check.Equals, "obj1") c.Assert(objects[3].Key, check.Equals, "obj1")
c.Assert(objects[4].Key, check.Equals, "obj10") c.Assert(objects[4].Key, check.Equals, "obj10")
} }
// checheck ordering of results with prefix
// check delimited results with delimiter and prefix
{
storage.StoreObject("bucket", "this/is/delimited", "", bytes.NewBufferString("prefix1"))
storage.StoreObject("bucket", "this/is/also/delimited", "", bytes.NewBufferString("prefix2"))
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = "this/is/"
resources.Maxkeys = 10
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(len(objects), check.Equals, 1)
c.Assert(resources.CommonPrefixes[0], check.Equals, "also/")
}
// check delimited results with delimiter without prefix
{
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
resources.Prefix = ""
resources.Maxkeys = 1000
objects, resources, err = storage.ListObjects("bucket", resources)
c.Assert(objects[0].Key, check.Equals, "newPrefix")
c.Assert(objects[1].Key, check.Equals, "newPrefix2")
c.Assert(objects[2].Key, check.Equals, "obj0")
c.Assert(objects[3].Key, check.Equals, "obj1")
c.Assert(objects[4].Key, check.Equals, "obj10")
c.Assert(resources.CommonPrefixes[0], check.Equals, "this/")
}
// check ordering of results with prefix
{ {
resources.Prefix = "obj" resources.Prefix = "obj"
resources.Delimiter = ""
resources.Maxkeys = 1000 resources.Maxkeys = 1000
objects, resources, err = storage.ListObjects("bucket", resources) objects, resources, err = storage.ListObjects("bucket", resources)
c.Log(objects)
c.Assert(objects[0].Key, check.Equals, "obj0") c.Assert(objects[0].Key, check.Equals, "obj0")
c.Assert(objects[1].Key, check.Equals, "obj1") c.Assert(objects[1].Key, check.Equals, "obj1")
c.Assert(objects[2].Key, check.Equals, "obj10") c.Assert(objects[2].Key, check.Equals, "obj10")
c.Assert(objects[3].Key, check.Equals, "obj2") c.Assert(objects[3].Key, check.Equals, "obj2")
c.Assert(objects[4].Key, check.Equals, "obj3") c.Assert(objects[4].Key, check.Equals, "obj3")
} }
// checheck ordering of results with prefix and no paging // check ordering of results with prefix and no paging
{ {
resources.Prefix = "new" resources.Prefix = "new"
resources.Maxkeys = 5 resources.Maxkeys = 5

Loading…
Cancel
Save