Merge pull request #679 from harshavardhana/pr_out_object_metadata_was_wrongly_misconstrued_to_be_mutable_handle_it

master
Harshavardhana 10 years ago
commit 9c2e861470
  1. 16
      pkg/storage/donut/bucket.go
  2. 15
      pkg/storage/donut/definitions.go
  3. 22
      pkg/storage/donut/donut.go
  4. 10
      pkg/storage/donut/donut_test.go
  5. 2
      pkg/storage/donut/interfaces.go
  6. 16
      pkg/storage/drivers/donut/donut.go

@ -36,6 +36,10 @@ import (
"github.com/minio/minio/pkg/utils/split" "github.com/minio/minio/pkg/utils/split"
) )
const (
blockSize = 10 * 1024 * 1024
)
// internal struct carrying bucket specific information // internal struct carrying bucket specific information
type bucket struct { type bucket struct {
name string name string
@ -72,7 +76,7 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu
metadata.ACL = aclType metadata.ACL = aclType
metadata.Created = t metadata.Created = t
metadata.Metadata = make(map[string]string) metadata.Metadata = make(map[string]string)
metadata.BucketObjectsMetadata = make(map[string]map[string]string) metadata.BucketObjects = make(map[string]interface{})
return b, metadata, nil return b, metadata, nil
} }
@ -153,7 +157,7 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) ([]st
if err != nil { if err != nil {
return nil, nil, false, iodine.New(err, nil) return nil, nil, false, iodine.New(err, nil)
} }
for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjectsMetadata { for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjects {
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) { if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
if objectName > marker { if objectName > marker {
objects = appendUniq(objects, objectName) objects = appendUniq(objects, objectName)
@ -203,7 +207,7 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64,
return nil, 0, iodine.New(err, nil) return nil, 0, iodine.New(err, nil)
} }
// check if object exists // check if object exists
if _, ok := bucketMetadata.Buckets[b.getBucketName()].BucketObjectsMetadata[objectName]; !ok { if _, ok := bucketMetadata.Buckets[b.getBucketName()].BucketObjects[objectName]; !ok {
return nil, 0, iodine.New(ObjectNotFound{Object: objectName}, nil) return nil, 0, iodine.New(ObjectNotFound{Object: objectName}, nil)
} }
objMetadata := ObjectMetadata{} objMetadata := ObjectMetadata{}
@ -227,7 +231,7 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64,
} }
// WriteObject - write a new object into bucket // WriteObject - write a new object into bucket
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string) (string, error) { func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) (string, error) {
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
if objectName == "" || objectData == nil { if objectName == "" || objectData == nil {
@ -263,7 +267,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
/// donutMetadata section /// donutMetadata section
objMetadata.BlockSize = 10 * 1024 * 1024 objMetadata.BlockSize = blockSize
objMetadata.ChunkCount = chunkCount objMetadata.ChunkCount = chunkCount
objMetadata.DataDisks = k objMetadata.DataDisks = k
objMetadata.ParityDisks = m objMetadata.ParityDisks = m
@ -284,6 +288,8 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
} }
objMetadata.Metadata = metadata
// write object specific metadata // write object specific metadata
if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil { if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)

@ -39,6 +39,9 @@ type ObjectMetadata struct {
// checksums // checksums
MD5Sum string `json:"sys.md5sum"` MD5Sum string `json:"sys.md5sum"`
SHA512Sum string `json:"sys.sha512sum"` SHA512Sum string `json:"sys.sha512sum"`
// metadata
Metadata map[string]string `json:"metadata"`
} }
// Metadata container for donut metadata // Metadata container for donut metadata
@ -54,10 +57,10 @@ type AllBuckets struct {
// BucketMetadata container for bucket level metadata // BucketMetadata container for bucket level metadata
type BucketMetadata struct { type BucketMetadata struct {
Version string `json:"version"` Version string `json:"version"`
Name string `json:"name"` Name string `json:"name"`
ACL string `json:"acl"` ACL string `json:"acl"`
Created time.Time `json:"created"` Created time.Time `json:"created"`
Metadata map[string]string `json:"metadata"` Metadata map[string]string `json:"metadata"`
BucketObjectsMetadata map[string]map[string]string `json:"objectsMetadata"` BucketObjects map[string]interface{} `json:"objects"`
} }

@ -198,14 +198,14 @@ func (dt donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadC
if err != nil { if err != nil {
return "", iodine.New(err, errParams) return "", iodine.New(err, errParams)
} }
if _, ok := bucketMeta.Buckets[bucket].BucketObjectsMetadata[object]; ok { if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok {
return "", iodine.New(ObjectExists{Object: object}, errParams) return "", iodine.New(ObjectExists{Object: object}, errParams)
} }
md5sum, err := dt.buckets[bucket].WriteObject(object, reader, expectedMD5Sum) md5sum, err := dt.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata)
if err != nil { if err != nil {
return "", iodine.New(err, errParams) return "", iodine.New(err, errParams)
} }
bucketMeta.Buckets[bucket].BucketObjectsMetadata[object] = metadata bucketMeta.Buckets[bucket].BucketObjects[object] = 1
if err := dt.setDonutBucketMetadata(bucketMeta); err != nil { if err := dt.setDonutBucketMetadata(bucketMeta); err != nil {
return "", iodine.New(err, errParams) return "", iodine.New(err, errParams)
} }
@ -236,7 +236,7 @@ func (dt donut) GetObject(bucket, object string) (reader io.ReadCloser, size int
} }
// GetObjectMetadata - get object metadata // GetObjectMetadata - get object metadata
func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, map[string]string, error) { func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, error) {
dt.lock.RLock() dt.lock.RLock()
defer dt.lock.RUnlock() defer dt.lock.RUnlock()
errParams := map[string]string{ errParams := map[string]string{
@ -244,23 +244,23 @@ func (dt donut) GetObjectMetadata(bucket, object string) (ObjectMetadata, map[st
"object": object, "object": object,
} }
if err := dt.listDonutBuckets(); err != nil { if err := dt.listDonutBuckets(); err != nil {
return ObjectMetadata{}, nil, iodine.New(err, errParams) return ObjectMetadata{}, iodine.New(err, errParams)
} }
if _, ok := dt.buckets[bucket]; !ok { if _, ok := dt.buckets[bucket]; !ok {
return ObjectMetadata{}, nil, iodine.New(BucketNotFound{Bucket: bucket}, errParams) return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
} }
bucketMeta, err := dt.getDonutBucketMetadata() bucketMeta, err := dt.getDonutBucketMetadata()
if err != nil { if err != nil {
return ObjectMetadata{}, nil, iodine.New(err, errParams) return ObjectMetadata{}, iodine.New(err, errParams)
} }
if _, ok := bucketMeta.Buckets[bucket].BucketObjectsMetadata[object]; !ok { if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; !ok {
return ObjectMetadata{}, nil, iodine.New(ObjectNotFound{Object: object}, errParams) return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: object}, errParams)
} }
objectMetadata, err := dt.buckets[bucket].GetObjectMetadata(object) objectMetadata, err := dt.buckets[bucket].GetObjectMetadata(object)
if err != nil { if err != nil {
return ObjectMetadata{}, nil, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
return objectMetadata, bucketMeta.Buckets[bucket].BucketObjectsMetadata[object], nil return objectMetadata, nil
} }
// getDiskWriters - // getDiskWriters -

@ -198,12 +198,12 @@ func (s *MySuite) TestNewObjectMetadata(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(calculatedMd5Sum, Equals, expectedMd5Sum) c.Assert(calculatedMd5Sum, Equals, expectedMd5Sum)
_, additionalMetadata, err := donut.GetObjectMetadata("foo", "obj") objectMetadata, err := donut.GetObjectMetadata("foo", "obj")
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(additionalMetadata["contentType"], Equals, metadata["contentType"]) c.Assert(objectMetadata.Metadata["contentType"], Equals, metadata["contentType"])
c.Assert(additionalMetadata["foo"], Equals, metadata["foo"]) c.Assert(objectMetadata.Metadata["foo"], Equals, metadata["foo"])
c.Assert(additionalMetadata["hello"], Equals, metadata["hello"]) c.Assert(objectMetadata.Metadata["hello"], Equals, metadata["hello"])
} }
// test create object fails without name // test create object fails without name
@ -252,7 +252,7 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(actualData.Bytes(), DeepEquals, []byte(data)) c.Assert(actualData.Bytes(), DeepEquals, []byte(data))
actualMetadata, _, err := donut.GetObjectMetadata("foo", "obj") actualMetadata, err := donut.GetObjectMetadata("foo", "obj")
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(expectedMd5Sum, Equals, actualMetadata.MD5Sum) c.Assert(expectedMd5Sum, Equals, actualMetadata.MD5Sum)
c.Assert(int64(len(data)), Equals, actualMetadata.Size) c.Assert(int64(len(data)), Equals, actualMetadata.Size)

@ -39,7 +39,7 @@ type ObjectStorage interface {
// Object operations // Object operations
GetObject(bucket, object string) (io.ReadCloser, int64, error) GetObject(bucket, object string) (io.ReadCloser, int64, error)
GetObjectMetadata(bucket, object string) (ObjectMetadata, map[string]string, error) GetObjectMetadata(bucket, object string) (ObjectMetadata, error)
PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error)
} }

@ -42,10 +42,6 @@ type donutDriver struct {
lock *sync.RWMutex lock *sync.RWMutex
} }
const (
blockSize = 10 * 1024 * 1024
)
// This is a dummy nodeDiskMap which is going to be deprecated soon // This is a dummy nodeDiskMap which is going to be deprecated soon
// once the Management API is standardized, this map is useful for now // once the Management API is standardized, this map is useful for now
// to show multi disk API correctness and parity calculation // to show multi disk API correctness and parity calculation
@ -165,11 +161,11 @@ func (d donutDriver) CreateBucket(bucketName, acl string) error {
acl = "private" acl = "private"
} }
if err := d.donut.MakeBucket(bucketName, acl); err != nil { if err := d.donut.MakeBucket(bucketName, acl); err != nil {
err = iodine.ToError(err) switch iodine.ToError(err).(type) {
if err.Error() == "bucket exists" { case donut.BucketExists:
return iodine.New(drivers.BucketExists{Bucket: bucketName}, nil) return iodine.New(drivers.BucketExists{Bucket: bucketName}, nil)
} }
return err return iodine.New(err, nil)
} }
return nil return nil
} }
@ -312,7 +308,7 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O
if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" { if !drivers.IsValidObjectName(objectName) || strings.TrimSpace(objectName) == "" {
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams) return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams)
} }
metadata, additionalMetadata, err := d.donut.GetObjectMetadata(bucketName, objectName) metadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
if err != nil { if err != nil {
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{ return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{
Bucket: bucketName, Bucket: bucketName,
@ -323,7 +319,7 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O
Bucket: bucketName, Bucket: bucketName,
Key: objectName, Key: objectName,
ContentType: additionalMetadata["contentType"], ContentType: metadata.Metadata["contentType"],
Created: metadata.Created, Created: metadata.Created,
Md5: metadata.MD5Sum, Md5: metadata.MD5Sum,
Size: metadata.Size, Size: metadata.Size,
@ -365,7 +361,7 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
} }
var results []drivers.ObjectMetadata var results []drivers.ObjectMetadata
for _, objectName := range actualObjects { for _, objectName := range actualObjects {
objectMetadata, _, err := d.donut.GetObjectMetadata(bucketName, objectName) objectMetadata, err := d.donut.GetObjectMetadata(bucketName, objectName)
if err != nil { if err != nil {
return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, errParams) return nil, drivers.BucketResourcesMetadata{}, iodine.New(err, errParams)
} }

Loading…
Cancel
Save