From 375860077ddcee5eaac136ae02275ffdedeb8c98 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 8 Jul 2015 20:18:32 -0700 Subject: [PATCH] HTTP header Content-Length signifies body length of the request, if its smaller reply appropriately This patch also handles large individual part sizes > 5MB by using less memory copies. --- pkg/donut/donut-v2.go | 35 ++++++++++++++++++++++++------- pkg/donut/errors.go | 8 +++++++ pkg/donut/multipart.go | 31 ++++++++++++++++----------- pkg/donut/signature-v4.go | 6 +++--- pkg/server/api/object-handlers.go | 4 ++++ 5 files changed, 61 insertions(+), 23 deletions(-) diff --git a/pkg/donut/donut-v2.go b/pkg/donut/donut-v2.go index 582cee7de..a7fb83f43 100644 --- a/pkg/donut/donut-v2.go +++ b/pkg/donut/donut-v2.go @@ -25,6 +25,7 @@ import ( "io" "io/ioutil" "log" + "net/http" "runtime/debug" "sort" "strconv" @@ -54,9 +55,10 @@ type Config struct { // API - local variables type API struct { config *Config + req *http.Request lock *sync.Mutex objects *data.Cache - multiPartObjects *data.Cache + multiPartObjects map[string]*data.Cache storedBuckets *metadata.Cache nodes map[string]node buckets map[string]bucket @@ -91,9 +93,8 @@ func New() (Interface, error) { a.nodes = make(map[string]node) a.buckets = make(map[string]bucket) a.objects = data.NewCache(a.config.MaxSize) - a.multiPartObjects = data.NewCache(0) + a.multiPartObjects = make(map[string]*data.Cache) a.objects.OnEvicted = a.evictedObject - a.multiPartObjects.OnEvicted = a.evictedPart a.lock = new(sync.Mutex) if len(a.config.NodeDiskMap) > 0 { @@ -113,16 +114,21 @@ func New() (Interface, error) { } for k, v := range buckets { var newBucket = storedBucket{} + newBucket.bucketMetadata = v newBucket.objectMetadata = make(map[string]ObjectMetadata) newBucket.multiPartSession = make(map[string]MultiPartSession) newBucket.partMetadata = make(map[int]PartMetadata) - newBucket.bucketMetadata = v a.storedBuckets.Set(k, newBucket) } } return a, nil } +// SetRequest API for setting request header +func (donut API) SetRequest(req *http.Request) { + donut.req = req +} + // GetObject - GET object from cache buffer func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) { donut.lock.Lock() @@ -344,7 +350,16 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s } if len(donut.config.NodeDiskMap) > 0 { - objMetadata, err := donut.putObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType}) + objMetadata, err := donut.putObject( + bucket, + key, + expectedMD5Sum, + data, + map[string]string{ + "contentType": contentType, + "contentLength": strconv.FormatInt(size, 10), + }, + ) if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } @@ -356,7 +371,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s hash := md5.New() var err error - var totalLength int + var totalLength int64 for err == nil { var length int byteBuffer := make([]byte, 1024*1024) @@ -371,13 +386,17 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s if !ok { return ObjectMetadata{}, iodine.New(InternalError{}, nil) } - totalLength += length + totalLength += int64(length) go debug.FreeOSMemory() } + if totalLength != size { + // Delete perhaps the object is already saved, due to the nature of append() + donut.objects.Delete(objectKey) + return ObjectMetadata{}, iodine.New(IncompleteBody{Bucket: bucket, Object: key}, nil) + } if err != io.EOF { return ObjectMetadata{}, iodine.New(err, nil) } - md5SumBytes := hash.Sum(nil) md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such diff --git a/pkg/donut/errors.go b/pkg/donut/errors.go index 692d85cb7..ea909235d 100644 --- a/pkg/donut/errors.go +++ b/pkg/donut/errors.go @@ -269,6 +269,14 @@ func (e EntityTooLarge) Error() string { return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize } +// IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header +type IncompleteBody GenericObjectError + +// Return string an error formatted as the given text +func (e IncompleteBody) Error() string { + return e.Bucket + "#" + e.Object + "has incomplete body" +} + // Return string an error formatted as the given text func (e BackendCorrupted) Error() string { return "Backend corrupted: " + e.Path diff --git a/pkg/donut/multipart.go b/pkg/donut/multipart.go index 78269ef03..8a9dbd176 100644 --- a/pkg/donut/multipart.go +++ b/pkg/donut/multipart.go @@ -31,6 +31,7 @@ import ( "strings" "time" + "github.com/minio/minio/pkg/donut/cache/data" "github.com/minio/minio/pkg/iodine" ) @@ -62,6 +63,9 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er initiated: time.Now(), totalParts: 0, } + multiPartCache := data.NewCache(0) + multiPartCache.OnEvicted = donut.evictedPart + donut.multiPartObjects[uploadID] = multiPartCache donut.storedBuckets.Set(bucket, storedBucket) return uploadID, nil } @@ -134,11 +138,11 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont // calculate md5 hash := md5.New() - var readBytes []byte var err error - var length int + var totalLength int64 for err == nil { + var length int byteBuffer := make([]byte, 1024*1024) length, err = data.Read(byteBuffer) // While hash.Write() wouldn't mind a Nil byteBuffer @@ -147,19 +151,22 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont break } hash.Write(byteBuffer[0:length]) - readBytes = append(readBytes, byteBuffer[0:length]...) + ok := donut.multiPartObjects[uploadID].Append(partID, byteBuffer[0:length]) + if !ok { + return "", iodine.New(InternalError{}, nil) + } + totalLength += int64(length) + go debug.FreeOSMemory() + } + if totalLength != size { + donut.multiPartObjects[uploadID].Delete(partID) + return "", iodine.New(IncompleteBody{Bucket: bucket, Object: key}, nil) } if err != io.EOF { return "", iodine.New(err, nil) } - go debug.FreeOSMemory() - md5SumBytes := hash.Sum(nil) - totalLength := int64(len(readBytes)) - - donut.multiPartObjects.Set(partID, readBytes) - // setting up for de-allocation - readBytes = nil + md5SumBytes := hash.Sum(nil) md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such if strings.TrimSpace(expectedMD5Sum) != "" { @@ -186,7 +193,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont func (donut API) cleanupMultipartSession(bucket, key, uploadID string) { storedBucket := donut.storedBuckets.Get(bucket).(storedBucket) for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ { - donut.multiPartObjects.Delete(i) + donut.multiPartObjects[uploadID].Delete(i) } delete(storedBucket.multiPartSession, key) donut.storedBuckets.Set(bucket, storedBucket) @@ -218,7 +225,7 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map var fullObject bytes.Buffer for i := 1; i <= len(parts); i++ { recvMD5 := parts[i] - object, ok := donut.multiPartObjects.Get(i) + object, ok := donut.multiPartObjects[uploadID].Get(i) if ok == false { donut.lock.Unlock() return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) diff --git a/pkg/donut/signature-v4.go b/pkg/donut/signature-v4.go index 83d643fba..cfdd04cf0 100644 --- a/pkg/donut/signature-v4.go +++ b/pkg/donut/signature-v4.go @@ -124,7 +124,7 @@ func (r *request) getHashedPayload() string { } } hashedPayload := hash() - r.calculatedReq.Header.Set("X-Amz-Content-Sha256", hashedPayload) + r.calculatedReq.Header.Set("x-amz-content-sha256", hashedPayload) return hashedPayload } @@ -241,8 +241,8 @@ func (r *request) getSignature(signingKey []byte, stringToSign string) string { func (r *request) SignV4() (string, error) { // Add date if not present var date string - if date = r.calculatedReq.Header.Get("Date"); date == "" { - if date = r.calculatedReq.Header.Get("X-Amz-Date"); date == "" { + if date = r.calculatedReq.Header.Get("x-amz-date"); date == "" { + if date = r.calculatedReq.Header.Get("Date"); date == "" { return "", iodine.New(MissingDateHeader{}, nil) } } diff --git a/pkg/server/api/object-handlers.go b/pkg/server/api/object-handlers.go index 210fe0fcc..8299d7285 100644 --- a/pkg/server/api/object-handlers.go +++ b/pkg/server/api/object-handlers.go @@ -218,6 +218,8 @@ func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) { writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) case donut.BadDigest: writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) + case donut.IncompleteBody: + writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path) case donut.EntityTooLarge: writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) case donut.InvalidDigest: @@ -340,6 +342,8 @@ func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request) writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) case donut.BadDigest: writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) + case donut.IncompleteBody: + writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path) case donut.EntityTooLarge: writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) case donut.InvalidDigest: