From 10ae089c467af65b3a3dc734ef21dfcffdcfbcec Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Mon, 16 Mar 2015 17:40:59 -0700 Subject: [PATCH] Better handling of metadata in encoded --- pkg/storage/encoded/encoded.go | 77 +++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 14 deletions(-) diff --git a/pkg/storage/encoded/encoded.go b/pkg/storage/encoded/encoded.go index 0b34ddd48..c9cb0a5dc 100644 --- a/pkg/storage/encoded/encoded.go +++ b/pkg/storage/encoded/encoded.go @@ -18,6 +18,7 @@ package encoded import ( "bytes" + "encoding/hex" "errors" "github.com/minio-io/minio/pkg/donutbox" "github.com/minio-io/minio/pkg/encoding/erasure" @@ -25,6 +26,7 @@ import ( "github.com/minio-io/minio/pkg/utils/split" "io" "strconv" + "time" ) // StorageDriver creates a new single disk storage driver using donut without encoding. @@ -69,13 +71,24 @@ func (diskStorage StorageDriver) GetBucketPolicy(bucket string) (storage.BucketP // GetObject retrieves an object and writes it to a writer func (diskStorage StorageDriver) GetObject(target io.Writer, bucket, key string) (int64, error) { metadata, err := diskStorage.donutBox.GetObjectMetadata(bucket, key, 0) - chunkCount, err := strconv.Atoi(metadata["chunkCount"]) - columnCount, err := strconv.Atoi(metadata["columnCount"]) + k, err := strconv.Atoi(metadata["erasureK"]) + if err != nil { + return 0, errors.New("Cannot parse erasureK") + } + m, err := strconv.Atoi(metadata["erasureM"]) + if err != nil { + return 0, errors.New("Cannot parse erasureM") + } + columnCount := k + m blockSize, err := strconv.Atoi(metadata["blockSize"]) - length, err := strconv.Atoi(metadata["length"]) if err != nil { - return 0, err + return 0, errors.New("Cannot parse blockSize") } + size, err := strconv.Atoi(metadata["size"]) + if err != nil { + return 0, errors.New("Cannot parse length") + } + chunkCount := size/blockSize + 1 var readers []io.Reader for column := 0; column < columnCount; column++ { reader, err := diskStorage.donutBox.GetObjectReader(bucket, key, uint(column)) @@ -85,8 +98,8 @@ func (diskStorage StorageDriver) GetObject(target io.Writer, bucket, key string) readers = append(readers, reader) } - totalWritten := int64(length) - totalRemaining := int64(length) + totalWritten := int64(size) + totalRemaining := int64(size) if err != err { return 0, err } @@ -127,8 +140,22 @@ func (diskStorage StorageDriver) GetPartialObject(w io.Writer, bucket, object st } // GetObjectMetadata retrieves an object's metadata -func (diskStorage StorageDriver) GetObjectMetadata(bucket, key string, prefix string) (metadata storage.ObjectMetadata, err error) { - return metadata, errors.New("Not Implemented") +func (diskStorage StorageDriver) GetObjectMetadata(bucket, key string, prefix string) (storage.ObjectMetadata, error) { + metadata, err := diskStorage.donutBox.GetObjectMetadata(bucket, key, 0) + if err != nil { + return storage.ObjectMetadata{}, err + } + created, err := time.Parse(metadata["created"], time.RFC3339Nano) + size, err := strconv.ParseInt(metadata["size"], 10, 64) + objectMetadata := storage.ObjectMetadata{ + Bucket: bucket, + Key: key, + ContentType: metadata["contentType"], + Created: created, + ETag: hex.EncodeToString([]byte(metadata["md5"])), + Size: size, + } + return objectMetadata, nil } // ListObjects lists objects @@ -177,12 +204,18 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string } // close connections - metadata := make(map[string]string) - metadata["length"] = strconv.FormatUint(totalLength, 10) - metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10) - metadata["chunkCount"] = strconv.FormatUint(uint64(chunkCount), 10) - metadata["columnCount"] = strconv.FormatUint(uint64(16), 10) - // metadata["md5"] := md5sum + metadataObj := storage.ObjectMetadata{ + Bucket: bucketKey, + Key: objectKey, + + ContentType: contentType, + Created: time.Now(), + ETag: "md5", + Size: int64(totalLength), + } + + metadata := createMetadata(metadataObj, blockSize, 8, 8, "Cauchy") + for column := uint(0); column < 16; column++ { writers[column].SetMetadata(metadata) } @@ -208,3 +241,19 @@ func closeAllWritersWithError(writers []*donutbox.NewObject, err error) { } } } + +func createMetadata(metadataObject storage.ObjectMetadata, blockSize int, k, m uint8, technique string) map[string]string { + metadata := make(map[string]string) + metadata["bucket"] = metadataObject.Bucket + metadata["key"] = metadataObject.Key + metadata["contentType"] = metadataObject.ContentType + metadata["created"] = metadataObject.Created.Format(time.RFC3339Nano) + metadata["md5"] = metadataObject.ETag + metadata["size"] = strconv.FormatInt(metadataObject.Size, 10) + + metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10) + metadata["erasureK"] = strconv.FormatUint(uint64(k), 10) + metadata["erasureM"] = strconv.FormatUint(uint64(m), 10) + metadata["erasureTechnique"] = technique + return metadata +}