HTTP header Content-Length signifies body length of the request, if its smaller reply appropriately

This patch also handles large individual part sizes > 5MB by using less memory copies.
master
Harshavardhana 10 years ago
parent cea093bf65
commit 375860077d
  1. 35
      pkg/donut/donut-v2.go
  2. 8
      pkg/donut/errors.go
  3. 31
      pkg/donut/multipart.go
  4. 6
      pkg/donut/signature-v4.go
  5. 4
      pkg/server/api/object-handlers.go

@ -25,6 +25,7 @@ import (
"io"
"io/ioutil"
"log"
"net/http"
"runtime/debug"
"sort"
"strconv"
@ -54,9 +55,10 @@ type Config struct {
// API - local variables
type API struct {
config *Config
req *http.Request
lock *sync.Mutex
objects *data.Cache
multiPartObjects *data.Cache
multiPartObjects map[string]*data.Cache
storedBuckets *metadata.Cache
nodes map[string]node
buckets map[string]bucket
@ -91,9 +93,8 @@ func New() (Interface, error) {
a.nodes = make(map[string]node)
a.buckets = make(map[string]bucket)
a.objects = data.NewCache(a.config.MaxSize)
a.multiPartObjects = data.NewCache(0)
a.multiPartObjects = make(map[string]*data.Cache)
a.objects.OnEvicted = a.evictedObject
a.multiPartObjects.OnEvicted = a.evictedPart
a.lock = new(sync.Mutex)
if len(a.config.NodeDiskMap) > 0 {
@ -113,16 +114,21 @@ func New() (Interface, error) {
}
for k, v := range buckets {
var newBucket = storedBucket{}
newBucket.bucketMetadata = v
newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata)
newBucket.bucketMetadata = v
a.storedBuckets.Set(k, newBucket)
}
}
return a, nil
}
// SetRequest API for setting request header
func (donut API) SetRequest(req *http.Request) {
donut.req = req
}
// GetObject - GET object from cache buffer
func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) {
donut.lock.Lock()
@ -344,7 +350,16 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
}
if len(donut.config.NodeDiskMap) > 0 {
objMetadata, err := donut.putObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType})
objMetadata, err := donut.putObject(
bucket,
key,
expectedMD5Sum,
data,
map[string]string{
"contentType": contentType,
"contentLength": strconv.FormatInt(size, 10),
},
)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
@ -356,7 +371,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
hash := md5.New()
var err error
var totalLength int
var totalLength int64
for err == nil {
var length int
byteBuffer := make([]byte, 1024*1024)
@ -371,13 +386,17 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
if !ok {
return ObjectMetadata{}, iodine.New(InternalError{}, nil)
}
totalLength += length
totalLength += int64(length)
go debug.FreeOSMemory()
}
if totalLength != size {
// Delete perhaps the object is already saved, due to the nature of append()
donut.objects.Delete(objectKey)
return ObjectMetadata{}, iodine.New(IncompleteBody{Bucket: bucket, Object: key}, nil)
}
if err != io.EOF {
return ObjectMetadata{}, iodine.New(err, nil)
}
md5SumBytes := hash.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such

@ -269,6 +269,14 @@ func (e EntityTooLarge) Error() string {
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize
}
// IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header
type IncompleteBody GenericObjectError
// Return string an error formatted as the given text
func (e IncompleteBody) Error() string {
return e.Bucket + "#" + e.Object + "has incomplete body"
}
// Return string an error formatted as the given text
func (e BackendCorrupted) Error() string {
return "Backend corrupted: " + e.Path

@ -31,6 +31,7 @@ import (
"strings"
"time"
"github.com/minio/minio/pkg/donut/cache/data"
"github.com/minio/minio/pkg/iodine"
)
@ -62,6 +63,9 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er
initiated: time.Now(),
totalParts: 0,
}
multiPartCache := data.NewCache(0)
multiPartCache.OnEvicted = donut.evictedPart
donut.multiPartObjects[uploadID] = multiPartCache
donut.storedBuckets.Set(bucket, storedBucket)
return uploadID, nil
}
@ -134,11 +138,11 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
// calculate md5
hash := md5.New()
var readBytes []byte
var err error
var length int
var totalLength int64
for err == nil {
var length int
byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
@ -147,19 +151,22 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
break
}
hash.Write(byteBuffer[0:length])
readBytes = append(readBytes, byteBuffer[0:length]...)
ok := donut.multiPartObjects[uploadID].Append(partID, byteBuffer[0:length])
if !ok {
return "", iodine.New(InternalError{}, nil)
}
totalLength += int64(length)
go debug.FreeOSMemory()
}
if totalLength != size {
donut.multiPartObjects[uploadID].Delete(partID)
return "", iodine.New(IncompleteBody{Bucket: bucket, Object: key}, nil)
}
if err != io.EOF {
return "", iodine.New(err, nil)
}
go debug.FreeOSMemory()
md5SumBytes := hash.Sum(nil)
totalLength := int64(len(readBytes))
donut.multiPartObjects.Set(partID, readBytes)
// setting up for de-allocation
readBytes = nil
md5SumBytes := hash.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
@ -186,7 +193,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ {
donut.multiPartObjects.Delete(i)
donut.multiPartObjects[uploadID].Delete(i)
}
delete(storedBucket.multiPartSession, key)
donut.storedBuckets.Set(bucket, storedBucket)
@ -218,7 +225,7 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
recvMD5 := parts[i]
object, ok := donut.multiPartObjects.Get(i)
object, ok := donut.multiPartObjects[uploadID].Get(i)
if ok == false {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)

@ -124,7 +124,7 @@ func (r *request) getHashedPayload() string {
}
}
hashedPayload := hash()
r.calculatedReq.Header.Set("X-Amz-Content-Sha256", hashedPayload)
r.calculatedReq.Header.Set("x-amz-content-sha256", hashedPayload)
return hashedPayload
}
@ -241,8 +241,8 @@ func (r *request) getSignature(signingKey []byte, stringToSign string) string {
func (r *request) SignV4() (string, error) {
// Add date if not present
var date string
if date = r.calculatedReq.Header.Get("Date"); date == "" {
if date = r.calculatedReq.Header.Get("X-Amz-Date"); date == "" {
if date = r.calculatedReq.Header.Get("x-amz-date"); date == "" {
if date = r.calculatedReq.Header.Get("Date"); date == "" {
return "", iodine.New(MissingDateHeader{}, nil)
}
}

@ -218,6 +218,8 @@ func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) {
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
case donut.BadDigest:
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
case donut.IncompleteBody:
writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path)
case donut.EntityTooLarge:
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
case donut.InvalidDigest:
@ -340,6 +342,8 @@ func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request)
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
case donut.BadDigest:
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
case donut.IncompleteBody:
writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path)
case donut.EntityTooLarge:
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
case donut.InvalidDigest:

Loading…
Cancel
Save