Merge pull request #483 from harshavardhana/pr_out_memory_now_handles_submitting_large_files_fixes_482

master
Harshavardhana 10 years ago
commit b3a9d8b058
  1. 2
      Makefile
  2. 4
      pkg/api/api_object_handlers.go
  3. 12
      pkg/storage/drivers/errors.go
  4. 87
      pkg/storage/drivers/memory/memory.go

@ -29,7 +29,7 @@ lint:
cyclo: cyclo:
@echo "Running $@:" @echo "Running $@:"
@test -z "$$(gocyclo -over 15 . | grep -v Godeps/_workspace/src/ | tee /dev/stderr)" @test -z "$$(gocyclo -over 16 . | grep -v Godeps/_workspace/src/ | tee /dev/stderr)"
pre-build: pre-build:
@echo "Running pre-build:" @echo "Running pre-build:"

@ -169,6 +169,10 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
{ {
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
} }
case drivers.EntityTooLarge:
{
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
}
case drivers.InvalidDigest: case drivers.InvalidDigest:
{ {
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)

@ -86,6 +86,13 @@ type ObjectNotFound GenericObjectError
// ObjectExists - object already exists // ObjectExists - object already exists
type ObjectExists GenericObjectError type ObjectExists GenericObjectError
// EntityTooLarge - object size exceeds maximum limit
type EntityTooLarge struct {
GenericObjectError
Size string
TotalSize string
}
// ObjectNameInvalid - object name provided is invalid // ObjectNameInvalid - object name provided is invalid
type ObjectNameInvalid GenericObjectError type ObjectNameInvalid GenericObjectError
@ -152,6 +159,11 @@ func (e ObjectNameInvalid) Error() string {
return "Object name invalid: " + e.Bucket + "#" + e.Object return "Object name invalid: " + e.Bucket + "#" + e.Object
} }
// Return string an error formatted as the given text
func (e EntityTooLarge) Error() string {
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.TotalSize
}
// Return string an error formatted as the given text // Return string an error formatted as the given text
func (e BackendCorrupted) Error() string { func (e BackendCorrupted) Error() string {
return "Backend corrupted: " + e.Path return "Backend corrupted: " + e.Path

@ -21,12 +21,15 @@ import (
"bytes" "bytes"
"io" "io"
"sort" "sort"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"crypto/md5" "crypto/md5"
"encoding/base64"
"encoding/hex" "encoding/hex"
"errors"
"io/ioutil" "io/ioutil"
@ -34,6 +37,7 @@ import (
"github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers" "github.com/minio-io/minio/pkg/storage/drivers"
"github.com/minio-io/minio/pkg/utils/log" "github.com/minio-io/minio/pkg/utils/log"
"github.com/minio-io/minio/pkg/utils/split"
) )
// memoryDriver - local variables // memoryDriver - local variables
@ -146,8 +150,27 @@ func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMeta
return memory.bucketMetadata[bucket].metadata, nil return memory.bucketMetadata[bucket].metadata, nil
} }
// isMD5SumEqual - returns error if md5sum mismatches, success its `nil`
func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return iodine.New(errors.New("bad digest, md5sum mismatch"), nil)
}
return nil
}
return iodine.New(errors.New("invalid argument"), nil)
}
// CreateObject - PUT object to memory buffer // CreateObject - PUT object to memory buffer
func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error { func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) error {
memory.lock.RLock() memory.lock.RLock()
if !drivers.IsValidBucket(bucket) { if !drivers.IsValidBucket(bucket) {
memory.lock.RUnlock() memory.lock.RUnlock()
@ -174,31 +197,61 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string
contentType = strings.TrimSpace(contentType) contentType = strings.TrimSpace(contentType)
memory.lock.Lock()
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
var bytesBuffer bytes.Buffer var bytesBuffer bytes.Buffer
var newObject = storedObject{} var newObject = storedObject{}
var dataSlice []byte
if _, ok := io.Copy(&bytesBuffer, data); ok == nil { chunks := split.Stream(data, 10*1024*1024)
size := bytesBuffer.Len() totalLength := 0
md5SumBytes := md5.Sum(bytesBuffer.Bytes()) summer := md5.New()
md5Sum := hex.EncodeToString(md5SumBytes[:]) for chunk := range chunks {
newObject.metadata = drivers.ObjectMetadata{ if chunk.Err == nil {
Bucket: bucket, totalLength = totalLength + len(chunk.Data)
Key: key, summer.Write(chunk.Data)
_, err := io.Copy(&bytesBuffer, bytes.NewBuffer(chunk.Data))
ContentType: contentType, if err != nil {
Created: time.Now(), return iodine.New(err, nil)
Md5: md5Sum, }
Size: int64(size), if uint64(totalLength) > memory.maxSize {
return iodine.New(drivers.EntityTooLarge{
Size: strconv.FormatInt(int64(totalLength), 10),
TotalSize: strconv.FormatUint(memory.totalSize, 10),
}, nil)
}
} }
dataSlice = bytesBuffer.Bytes()
} }
memory.lock.Lock() md5SumBytes := summer.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
return iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
}
}
newObject.metadata = drivers.ObjectMetadata{
Bucket: bucket,
Key: key,
ContentType: contentType,
Created: time.Now(),
Md5: md5Sum,
Size: int64(totalLength),
}
if _, ok := memory.objectMetadata[objectKey]; ok == true { if _, ok := memory.objectMetadata[objectKey]; ok == true {
memory.lock.Unlock() memory.lock.Unlock()
return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
} }
memory.objectMetadata[objectKey] = newObject memory.objectMetadata[objectKey] = newObject
memory.objects.Add(objectKey, dataSlice) memory.objects.Add(objectKey, bytesBuffer.Bytes())
memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size) memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size)
for memory.totalSize > memory.maxSize { for memory.totalSize > memory.maxSize {
memory.objects.RemoveOldest() memory.objects.RemoveOldest()

Loading…
Cancel
Save