From c5d8ca245d3247c4a235ce0904b293840437f8c1 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 24 Apr 2015 15:19:29 -0700 Subject: [PATCH] Memory now handles submitting large files - fixes #482 --- Makefile | 2 +- pkg/api/api_object_handlers.go | 4 ++ pkg/storage/drivers/errors.go | 12 ++++ pkg/storage/drivers/memory/memory.go | 87 ++++++++++++++++++++++------ 4 files changed, 87 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index 8526bdfb1..4f3b42afc 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ lint: cyclo: @echo "Running $@:" - @test -z "$$(gocyclo -over 15 . | grep -v Godeps/_workspace/src/ | tee /dev/stderr)" + @test -z "$$(gocyclo -over 16 . | grep -v Godeps/_workspace/src/ | tee /dev/stderr)" pre-build: @echo "Running pre-build:" diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index 9c6c9775d..0c0900429 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -169,6 +169,10 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques { writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) } + case drivers.EntityTooLarge: + { + writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) + } case drivers.InvalidDigest: { writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) diff --git a/pkg/storage/drivers/errors.go b/pkg/storage/drivers/errors.go index e01dfd13b..cdcea484c 100644 --- a/pkg/storage/drivers/errors.go +++ b/pkg/storage/drivers/errors.go @@ -86,6 +86,13 @@ type ObjectNotFound GenericObjectError // ObjectExists - object already exists type ObjectExists GenericObjectError +// EntityTooLarge - object size exceeds maximum limit +type EntityTooLarge struct { + GenericObjectError + Size string + TotalSize string +} + // ObjectNameInvalid - object name provided is invalid type ObjectNameInvalid GenericObjectError @@ -152,6 +159,11 @@ func (e ObjectNameInvalid) Error() string { return "Object name invalid: " + e.Bucket + "#" + e.Object } +// Return string an error formatted as the given text +func (e EntityTooLarge) Error() string { + return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.TotalSize +} + // Return string an error formatted as the given text func (e BackendCorrupted) Error() string { return "Backend corrupted: " + e.Path diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 8f0851c59..3fe8d57c2 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -21,12 +21,15 @@ import ( "bytes" "io" "sort" + "strconv" "strings" "sync" "time" "crypto/md5" + "encoding/base64" "encoding/hex" + "errors" "io/ioutil" @@ -34,6 +37,7 @@ import ( "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" "github.com/minio-io/minio/pkg/utils/log" + "github.com/minio-io/minio/pkg/utils/split" ) // memoryDriver - local variables @@ -146,8 +150,27 @@ func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMeta return memory.bucketMetadata[bucket].metadata, nil } +// isMD5SumEqual - returns error if md5sum mismatches, success its `nil` +func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { + if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" { + expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum) + if err != nil { + return iodine.New(err, nil) + } + actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum) + if err != nil { + return iodine.New(err, nil) + } + if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) { + return iodine.New(errors.New("bad digest, md5sum mismatch"), nil) + } + return nil + } + return iodine.New(errors.New("invalid argument"), nil) +} + // CreateObject - PUT object to memory buffer -func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error { +func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) error { memory.lock.RLock() if !drivers.IsValidBucket(bucket) { memory.lock.RUnlock() @@ -174,31 +197,61 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string contentType = strings.TrimSpace(contentType) + memory.lock.Lock() + if strings.TrimSpace(expectedMD5Sum) != "" { + expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) + if err != nil { + // pro-actively close the connection + return iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil) + } + expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) + } + var bytesBuffer bytes.Buffer var newObject = storedObject{} - var dataSlice []byte - if _, ok := io.Copy(&bytesBuffer, data); ok == nil { - size := bytesBuffer.Len() - md5SumBytes := md5.Sum(bytesBuffer.Bytes()) - md5Sum := hex.EncodeToString(md5SumBytes[:]) - newObject.metadata = drivers.ObjectMetadata{ - Bucket: bucket, - Key: key, - - ContentType: contentType, - Created: time.Now(), - Md5: md5Sum, - Size: int64(size), + + chunks := split.Stream(data, 10*1024*1024) + totalLength := 0 + summer := md5.New() + for chunk := range chunks { + if chunk.Err == nil { + totalLength = totalLength + len(chunk.Data) + summer.Write(chunk.Data) + _, err := io.Copy(&bytesBuffer, bytes.NewBuffer(chunk.Data)) + if err != nil { + return iodine.New(err, nil) + } + if uint64(totalLength) > memory.maxSize { + return iodine.New(drivers.EntityTooLarge{ + Size: strconv.FormatInt(int64(totalLength), 10), + TotalSize: strconv.FormatUint(memory.totalSize, 10), + }, nil) + } } - dataSlice = bytesBuffer.Bytes() } - memory.lock.Lock() + md5SumBytes := summer.Sum(nil) + md5Sum := hex.EncodeToString(md5SumBytes) + // Verify if the written object is equal to what is expected, only if it is requested as such + if strings.TrimSpace(expectedMD5Sum) != "" { + if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { + return iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) + } + } + newObject.metadata = drivers.ObjectMetadata{ + Bucket: bucket, + Key: key, + + ContentType: contentType, + Created: time.Now(), + Md5: md5Sum, + Size: int64(totalLength), + } if _, ok := memory.objectMetadata[objectKey]; ok == true { memory.lock.Unlock() return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) } memory.objectMetadata[objectKey] = newObject - memory.objects.Add(objectKey, dataSlice) + memory.objects.Add(objectKey, bytesBuffer.Bytes()) memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size) for memory.totalSize > memory.maxSize { memory.objects.RemoveOldest()