diff --git a/pkg/donutbox/donutbox.go b/pkg/donutbox/donutbox.go index 822d94442..72622ad2e 100644 --- a/pkg/donutbox/donutbox.go +++ b/pkg/donutbox/donutbox.go @@ -14,13 +14,58 @@ type DonutBox interface { SetBucketMetadata(bucket string, metadata map[string]string) error // object operations - GetObjectWriter(bucket, object string, column, blockSize uint) *io.PipeWriter - GetObjectReader(bucket, object string, column int) (io.Reader, error) - SetObjectMetadata(bucket, object string, metadata map[string]string) error - GetObjectMetadata(bucket, object string) (map[string]string, error) + GetObjectWriter(bucket, object string, column, blockSize uint) (*NewObject, error) + GetObjectReader(bucket, object string, column uint) (io.Reader, error) + GetObjectMetadata(bucket, object string, column uint) (map[string]string, error) } // Result is a result for async tasks type Result struct { Err error } + +// CreateNewObject creates a new object wrapping a writer. Clients are not expected to use this directly. This is exposed for storage drivers. +func CreateNewObject(writer *io.PipeWriter) *NewObject { + return &NewObject{writer: writer} +} + +// NewObject wraps a writer and allows setting metadata. On a successful close, the object is committed by the backend. +type NewObject struct { + metadata map[string]string + writer *io.PipeWriter +} + +// Write data +func (newObject *NewObject) Write(data []byte) (int, error) { + return newObject.writer.Write(data) +} + +// SetMetadata sets metadata for an object +func (newObject *NewObject) SetMetadata(metadata map[string]string) { + newMetadata := make(map[string]string) + for k, v := range metadata { + newMetadata[k] = v + } + newObject.metadata = newMetadata +} + +// Close and commit the object +func (newObject *NewObject) Close() error { + return newObject.writer.Close() +} + +// CloseWithError closes the object with an error, causing the backend to abandon the object +func (newObject *NewObject) CloseWithError(err error) error { + return newObject.writer.CloseWithError(err) +} + +// GetMetadata returns a copy of the metadata set metadata +func (newObject *NewObject) GetMetadata() map[string]string { + newMetadata := make(map[string]string) + if newMetadata != nil { + for k, v := range newObject.metadata { + newMetadata[k] = v + } + } + return newMetadata +} diff --git a/pkg/donutbox/donutmem/donutmem.go b/pkg/donutbox/donutmem/donutmem.go index 24cf5343c..51fd86cb3 100644 --- a/pkg/donutbox/donutmem/donutmem.go +++ b/pkg/donutbox/donutmem/donutmem.go @@ -74,12 +74,17 @@ func (donutMem donutMem) ListObjectsInBucket(bucketKey, prefixKey string) ([]str if curBucket, ok := donutMem.buckets[bucketKey]; ok { curBucket.lock.RLock() defer curBucket.lock.RUnlock() - var objects []string + objectMap := make(map[string]string) for objectKey := range curBucket.objects { - if strings.HasPrefix(objectKey, prefixKey) { - objects = append(objects, objectKey) + objectName := strings.Split(objectKey, "#")[0] + if strings.HasPrefix(objectName, prefixKey) { + objectMap[objectName] = objectName } } + var objects []string + for k := range objectMap { + objects = append(objects, k) + } return objects, nil } return nil, errors.New("Bucket does not exist") @@ -118,24 +123,20 @@ func (donutMem donutMem) SetBucketMetadata(bucketKey string, metadata map[string } // object operations -func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) *io.PipeWriter { +func (donutMem donutMem) GetObjectWriter(bucketKey, objectKey string, column uint, blockSize uint) (*donutbox.NewObject, error) { + key := getKey(bucketKey, objectKey, column) reader, writer := io.Pipe() + returnObject := donutbox.CreateNewObject(writer) donutMem.lock.RLock() defer donutMem.lock.RUnlock() - if curBucket, ok := donutMem.buckets[bucket]; ok { + if curBucket, ok := donutMem.buckets[bucketKey]; ok { curBucket.lock.Lock() defer curBucket.lock.Unlock() if _, ok := curBucket.objects[key]; !ok { - // create object - metadata := make(map[string]string) - metadata["key"] = key - metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10) - newObject := object{ - name: key, - data: make([]byte, 0), - metadata: metadata, - lock: new(sync.RWMutex), + name: key, + data: make([]byte, 0), + lock: new(sync.RWMutex), } newObject.lock.Lock() @@ -143,33 +144,45 @@ func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockS go func() { defer newObject.lock.Unlock() var objBuffer bytes.Buffer + _, err := io.Copy(&objBuffer, reader) if err == nil { newObject.data = objBuffer.Bytes() writer.Close() - } else { - donutMem.lock.RLock() - defer donutMem.lock.RUnlock() - bucket, _ := donutMem.buckets[bucket] - bucket.lock.Lock() - defer bucket.lock.Unlock() - delete(bucket.objects, key) - writer.CloseWithError(err) + + metadata := returnObject.GetMetadata() + for k, v := range metadata { + metadata[k] = v + } + metadata["key"] = objectKey + metadata["column"] = strconv.FormatUint(uint64(column), 10) + newObject.metadata = metadata + + return } + + donutMem.lock.RLock() + defer donutMem.lock.RUnlock() + bucket, _ := donutMem.buckets[bucketKey] + bucket.lock.Lock() + defer bucket.lock.Unlock() + delete(bucket.objects, key) + writer.CloseWithError(err) }() - return writer + return returnObject, nil } writer.CloseWithError(errors.New("Object exists")) - return writer + return nil, errors.New("Object exists") } writer.CloseWithError(errors.New("Bucket does not exist")) - return writer + return nil, errors.New("Bucket does not exist") } -func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Reader, error) { +func (donutMem donutMem) GetObjectReader(bucketKey, objectKey string, column uint) (io.Reader, error) { + key := getKey(bucketKey, objectKey, column) donutMem.lock.RLock() defer donutMem.lock.RUnlock() - if curBucket, ok := donutMem.buckets[bucket]; ok { + if curBucket, ok := donutMem.buckets[bucketKey]; ok { curBucket.lock.RLock() defer curBucket.lock.RUnlock() if curObject, ok := curBucket.objects[key]; ok { @@ -182,35 +195,37 @@ func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Rea return nil, errors.New("Bucket not found") } -func (donutMem donutMem) SetObjectMetadata(bucketKey, objectKey string, metadata map[string]string) error { - donutMem.lock.RLock() - defer donutMem.lock.RUnlock() - if curBucket, ok := donutMem.buckets[bucketKey]; ok { - curBucket.lock.RLock() - defer curBucket.lock.RUnlock() - if curObject, ok := curBucket.objects[objectKey]; ok { - curObject.lock.Lock() - defer curObject.lock.Unlock() - newMetadata := make(map[string]string) - for k, v := range metadata { - newMetadata[k] = v - } - curObject.metadata = newMetadata - return nil - } - return errors.New("Object not found") - } - return errors.New("Bucket not found") -} - -func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string) (map[string]string, error) { +//func (donutMem donutMem) SetObjectMetadata(bucketKey, objectKey string, column uint, metadata map[string]string) error { +// key := getKey(bucketKey, objectKey, column) +// donutMem.lock.RLock() +// defer donutMem.lock.RUnlock() +// if curBucket, ok := donutMem.buckets[bucketKey]; ok { +// curBucket.lock.RLock() +// defer curBucket.lock.RUnlock() +// if curObject, ok := curBucket.objects[key]; ok { +// curObject.lock.Lock() +// defer curObject.lock.Unlock() +// newMetadata := make(map[string]string) +// for k, v := range metadata { +// newMetadata[k] = v +// } +// curObject.metadata = newMetadata +// return nil +// } +// return errors.New("Object not found") +// } +// return errors.New("Bucket not found") +//} + +func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string, column uint) (map[string]string, error) { + key := getKey(bucketKey, objectKey, column) donutMem.lock.RLock() defer donutMem.lock.RUnlock() if curBucket, ok := donutMem.buckets[bucketKey]; ok { curBucket.lock.RLock() defer curBucket.lock.RUnlock() - if curObject, ok := curBucket.objects[objectKey]; ok { + if curObject, ok := curBucket.objects[key]; ok { curObject.lock.RLock() defer curObject.lock.RUnlock() result := make(map[string]string) @@ -223,3 +238,7 @@ func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string) (map[str } return nil, errors.New("Bucket not found") } + +func getKey(bucketKey, objectKey string, column uint) string { + return objectKey + "#" + strconv.FormatUint(uint64(column), 10) +} diff --git a/pkg/donutbox/donutmem/donutmem_test.go b/pkg/donutbox/donutmem/donutmem_test.go index 2267c10ac..2b1ac475d 100644 --- a/pkg/donutbox/donutmem/donutmem_test.go +++ b/pkg/donutbox/donutmem/donutmem_test.go @@ -19,15 +19,16 @@ func (s *MySuite) TestCreateAndReadObject(c *C) { data := "Hello World" donut := NewDonutMem() - writer := donut.GetObjectWriter("foo", "bar", 0, 2) - count, err := writer.Write([]byte("hello")) + writer, err := donut.GetObjectWriter("foo", "bar", 0, 2) + c.Assert(writer, IsNil) c.Assert(err, Not(IsNil)) err = donut.CreateBucket("foo") c.Assert(err, IsNil) - writer = donut.GetObjectWriter("foo", "bar", 0, 2) - count, err = writer.Write([]byte(data)) + writer, err = donut.GetObjectWriter("foo", "bar", 0, 2) + c.Assert(err, IsNil) + count, err := writer.Write([]byte(data)) c.Assert(count, Equals, len(data)) c.Assert(err, IsNil) err = writer.Close() @@ -40,13 +41,8 @@ func (s *MySuite) TestCreateAndReadObject(c *C) { c.Assert(result, DeepEquals, []byte(data)) // try writing, should see error - writer = donut.GetObjectWriter("foo", "bar", 0, 2) - count, err = writer.Write([]byte("different data")) - c.Assert(count, Equals, 0) - c.Assert(err, Not(IsNil)) - // try again, should see error - count, err = writer.Write([]byte("different data")) - c.Assert(count, Equals, 0) + writer, err = donut.GetObjectWriter("foo", "bar", 0, 2) + c.Assert(writer, IsNil) c.Assert(err, Not(IsNil)) // data should not change @@ -87,7 +83,8 @@ func (s *MySuite) TestObjectList(c *C) { for i := 0; i < 10; i++ { object := "foo" + strconv.Itoa(i) objects = append(objects, object) - writer := donut.GetObjectWriter("foo", object, 0, 2) + writer, err := donut.GetObjectWriter("foo", object, 0, 2) + c.Assert(err, IsNil) writer.Write([]byte(object)) writer.Close() c.Assert(err, IsNil) @@ -125,24 +122,31 @@ func (s *MySuite) TestObjectMetadata(c *C) { metadata["hello"] = "world" metadata["foo"] = "bar" - err := donut.SetObjectMetadata("foo", "bar", metadata) - c.Assert(err, Not(IsNil)) - - result, err := donut.GetObjectMetadata("foo", "bar") + result, err := donut.GetObjectMetadata("foo", "bar", 1) c.Assert(result, IsNil) c.Assert(err, Not(IsNil)) - writer := donut.GetObjectWriter("foo", "bar", 0, 2) + writer, err := donut.GetObjectWriter("foo", "bar", 1, 2) + c.Assert(err, IsNil) _, err = writer.Write([]byte("Hello World")) c.Assert(err, IsNil) + writer.SetMetadata(metadata) err = writer.Close() c.Assert(err, IsNil) - err = donut.SetObjectMetadata("foo", "bar", metadata) - c.Assert(err, IsNil) + expectedMetadata := make(map[string]string) + for k, v := range metadata { + expectedMetadata[k] = v + } + expectedMetadata["key"] = "bar" + expectedMetadata["column"] = "1" - result, err = donut.GetObjectMetadata("foo", "bar") + result, err = donut.GetObjectMetadata("foo", "bar", 1) c.Assert(err, IsNil) - c.Assert(result, DeepEquals, metadata) + c.Assert(result, DeepEquals, expectedMetadata) + + result, err = donut.GetObjectMetadata("foo", "bar", 0) + c.Assert(err, Not(IsNil)) + c.Assert(result, IsNil) } diff --git a/pkg/storage/encoded/encoded.go b/pkg/storage/encoded/encoded.go index 8fd78cccc..0a1cecc58 100644 --- a/pkg/storage/encoded/encoded.go +++ b/pkg/storage/encoded/encoded.go @@ -19,8 +19,11 @@ package encoded import ( "errors" "github.com/minio-io/minio/pkg/donutbox" + "github.com/minio-io/minio/pkg/encoding/erasure" "github.com/minio-io/minio/pkg/storage" + "github.com/minio-io/minio/pkg/utils/split" "io" + "strconv" ) // StorageDriver creates a new single disk storage driver using donut without encoding. @@ -83,6 +86,68 @@ func (diskStorage StorageDriver) ListObjects(bucket string, resources storage.Bu } // CreateObject creates a new object -func (diskStorage StorageDriver) CreateObject(bucket string, key string, contentType string, data io.Reader) error { - return errors.New("Not Implemented") +func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string, contentType string, reader io.Reader) error { + blockSize := 10 * 1024 * 1024 + // split stream + splitStream := split.Stream(reader, uint64(blockSize)) + writers := make([]*donutbox.NewObject, 16) + for i := 0; i < 16; i++ { + newWriter, err := diskStorage.donutBox.GetObjectWriter(bucketKey, objectKey, uint(i), uint(blockSize)) + if err != nil { + closeAllWritersWithError(writers, err) + return err + } + writers[i] = newWriter + } + totalLength := uint64(0) + for chunk := range splitStream { + params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + if err != nil { + return err + } + totalLength = totalLength + uint64(len(chunk.Data)) + encoder := erasure.NewEncoder(params) + if chunk.Err == nil { + parts, _ := encoder.Encode(chunk.Data) + for index, part := range parts { + if _, err := writers[index].Write(part); err != nil { + closeAllWritersWithError(writers, err) + return err + } + } + } else { + closeAllWritersWithError(writers, chunk.Err) + return chunk.Err + } + // encode data + // write + } + // close connections + closeAllWriters(writers) + + metadata := make(map[string]string) + metadata["length"] = strconv.FormatUint(totalLength, 10) + metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10) + // metadata["md5"] := md5sum + for column := uint(0); column < 16; column++ { + writers[column].SetMetadata(metadata) + } + + return nil +} + +func closeAllWriters(writers []*donutbox.NewObject) { + for _, writer := range writers { + if writer != nil { + writer.Close() + } + } +} + +func closeAllWritersWithError(writers []*donutbox.NewObject, err error) { + for _, writer := range writers { + if writer != nil { + writer.CloseWithError(err) + } + } }