Merge pull request #722 from harshavardhana/pr_out_http_header_content_length_signifies_body_length_of_the_request_if_its_smaller_reply_appropriately

HTTP header Content-Length signifies body length of the request, if its smaller reply appropriately
master
Harshavardhana 10 years ago
commit 51d2d8e221
  1. 35
      pkg/donut/donut-v2.go
  2. 8
      pkg/donut/errors.go
  3. 31
      pkg/donut/multipart.go
  4. 6
      pkg/donut/signature-v4.go
  5. 4
      pkg/server/api/object-handlers.go

@ -25,6 +25,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http"
"runtime/debug" "runtime/debug"
"sort" "sort"
"strconv" "strconv"
@ -54,9 +55,10 @@ type Config struct {
// API - local variables // API - local variables
type API struct { type API struct {
config *Config config *Config
req *http.Request
lock *sync.Mutex lock *sync.Mutex
objects *data.Cache objects *data.Cache
multiPartObjects *data.Cache multiPartObjects map[string]*data.Cache
storedBuckets *metadata.Cache storedBuckets *metadata.Cache
nodes map[string]node nodes map[string]node
buckets map[string]bucket buckets map[string]bucket
@ -91,9 +93,8 @@ func New() (Interface, error) {
a.nodes = make(map[string]node) a.nodes = make(map[string]node)
a.buckets = make(map[string]bucket) a.buckets = make(map[string]bucket)
a.objects = data.NewCache(a.config.MaxSize) a.objects = data.NewCache(a.config.MaxSize)
a.multiPartObjects = data.NewCache(0) a.multiPartObjects = make(map[string]*data.Cache)
a.objects.OnEvicted = a.evictedObject a.objects.OnEvicted = a.evictedObject
a.multiPartObjects.OnEvicted = a.evictedPart
a.lock = new(sync.Mutex) a.lock = new(sync.Mutex)
if len(a.config.NodeDiskMap) > 0 { if len(a.config.NodeDiskMap) > 0 {
@ -113,16 +114,21 @@ func New() (Interface, error) {
} }
for k, v := range buckets { for k, v := range buckets {
var newBucket = storedBucket{} var newBucket = storedBucket{}
newBucket.bucketMetadata = v
newBucket.objectMetadata = make(map[string]ObjectMetadata) newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession) newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata) newBucket.partMetadata = make(map[int]PartMetadata)
newBucket.bucketMetadata = v
a.storedBuckets.Set(k, newBucket) a.storedBuckets.Set(k, newBucket)
} }
} }
return a, nil return a, nil
} }
// SetRequest API for setting request header
func (donut API) SetRequest(req *http.Request) {
donut.req = req
}
// GetObject - GET object from cache buffer // GetObject - GET object from cache buffer
func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) { func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) {
donut.lock.Lock() donut.lock.Lock()
@ -344,7 +350,16 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
} }
if len(donut.config.NodeDiskMap) > 0 { if len(donut.config.NodeDiskMap) > 0 {
objMetadata, err := donut.putObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType}) objMetadata, err := donut.putObject(
bucket,
key,
expectedMD5Sum,
data,
map[string]string{
"contentType": contentType,
"contentLength": strconv.FormatInt(size, 10),
},
)
if err != nil { if err != nil {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
@ -356,7 +371,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
hash := md5.New() hash := md5.New()
var err error var err error
var totalLength int var totalLength int64
for err == nil { for err == nil {
var length int var length int
byteBuffer := make([]byte, 1024*1024) byteBuffer := make([]byte, 1024*1024)
@ -371,13 +386,17 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
if !ok { if !ok {
return ObjectMetadata{}, iodine.New(InternalError{}, nil) return ObjectMetadata{}, iodine.New(InternalError{}, nil)
} }
totalLength += length totalLength += int64(length)
go debug.FreeOSMemory() go debug.FreeOSMemory()
} }
if totalLength != size {
// Delete perhaps the object is already saved, due to the nature of append()
donut.objects.Delete(objectKey)
return ObjectMetadata{}, iodine.New(IncompleteBody{Bucket: bucket, Object: key}, nil)
}
if err != io.EOF { if err != io.EOF {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
md5SumBytes := hash.Sum(nil) md5SumBytes := hash.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes) md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such // Verify if the written object is equal to what is expected, only if it is requested as such

@ -269,6 +269,14 @@ func (e EntityTooLarge) Error() string {
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize
} }
// IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header
type IncompleteBody GenericObjectError
// Return string an error formatted as the given text
func (e IncompleteBody) Error() string {
return e.Bucket + "#" + e.Object + "has incomplete body"
}
// Return string an error formatted as the given text // Return string an error formatted as the given text
func (e BackendCorrupted) Error() string { func (e BackendCorrupted) Error() string {
return "Backend corrupted: " + e.Path return "Backend corrupted: " + e.Path

@ -31,6 +31,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/minio/minio/pkg/donut/cache/data"
"github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/iodine"
) )
@ -62,6 +63,9 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er
initiated: time.Now(), initiated: time.Now(),
totalParts: 0, totalParts: 0,
} }
multiPartCache := data.NewCache(0)
multiPartCache.OnEvicted = donut.evictedPart
donut.multiPartObjects[uploadID] = multiPartCache
donut.storedBuckets.Set(bucket, storedBucket) donut.storedBuckets.Set(bucket, storedBucket)
return uploadID, nil return uploadID, nil
} }
@ -134,11 +138,11 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
// calculate md5 // calculate md5
hash := md5.New() hash := md5.New()
var readBytes []byte
var err error var err error
var length int var totalLength int64
for err == nil { for err == nil {
var length int
byteBuffer := make([]byte, 1024*1024) byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer) length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer // While hash.Write() wouldn't mind a Nil byteBuffer
@ -147,19 +151,22 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
break break
} }
hash.Write(byteBuffer[0:length]) hash.Write(byteBuffer[0:length])
readBytes = append(readBytes, byteBuffer[0:length]...) ok := donut.multiPartObjects[uploadID].Append(partID, byteBuffer[0:length])
if !ok {
return "", iodine.New(InternalError{}, nil)
}
totalLength += int64(length)
go debug.FreeOSMemory()
}
if totalLength != size {
donut.multiPartObjects[uploadID].Delete(partID)
return "", iodine.New(IncompleteBody{Bucket: bucket, Object: key}, nil)
} }
if err != io.EOF { if err != io.EOF {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
go debug.FreeOSMemory()
md5SumBytes := hash.Sum(nil)
totalLength := int64(len(readBytes))
donut.multiPartObjects.Set(partID, readBytes)
// setting up for de-allocation
readBytes = nil
md5SumBytes := hash.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes) md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such // Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" { if strings.TrimSpace(expectedMD5Sum) != "" {
@ -186,7 +193,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
func (donut API) cleanupMultipartSession(bucket, key, uploadID string) { func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket) storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ { for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ {
donut.multiPartObjects.Delete(i) donut.multiPartObjects[uploadID].Delete(i)
} }
delete(storedBucket.multiPartSession, key) delete(storedBucket.multiPartSession, key)
donut.storedBuckets.Set(bucket, storedBucket) donut.storedBuckets.Set(bucket, storedBucket)
@ -218,7 +225,7 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map
var fullObject bytes.Buffer var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ { for i := 1; i <= len(parts); i++ {
recvMD5 := parts[i] recvMD5 := parts[i]
object, ok := donut.multiPartObjects.Get(i) object, ok := donut.multiPartObjects[uploadID].Get(i)
if ok == false { if ok == false {
donut.lock.Unlock() donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)

@ -124,7 +124,7 @@ func (r *request) getHashedPayload() string {
} }
} }
hashedPayload := hash() hashedPayload := hash()
r.calculatedReq.Header.Set("X-Amz-Content-Sha256", hashedPayload) r.calculatedReq.Header.Set("x-amz-content-sha256", hashedPayload)
return hashedPayload return hashedPayload
} }
@ -241,8 +241,8 @@ func (r *request) getSignature(signingKey []byte, stringToSign string) string {
func (r *request) SignV4() (string, error) { func (r *request) SignV4() (string, error) {
// Add date if not present // Add date if not present
var date string var date string
if date = r.calculatedReq.Header.Get("Date"); date == "" { if date = r.calculatedReq.Header.Get("x-amz-date"); date == "" {
if date = r.calculatedReq.Header.Get("X-Amz-Date"); date == "" { if date = r.calculatedReq.Header.Get("Date"); date == "" {
return "", iodine.New(MissingDateHeader{}, nil) return "", iodine.New(MissingDateHeader{}, nil)
} }
} }

@ -218,6 +218,8 @@ func (api Minio) PutObjectHandler(w http.ResponseWriter, req *http.Request) {
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
case donut.BadDigest: case donut.BadDigest:
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
case donut.IncompleteBody:
writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path)
case donut.EntityTooLarge: case donut.EntityTooLarge:
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
case donut.InvalidDigest: case donut.InvalidDigest:
@ -340,6 +342,8 @@ func (api Minio) PutObjectPartHandler(w http.ResponseWriter, req *http.Request)
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
case donut.BadDigest: case donut.BadDigest:
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
case donut.IncompleteBody:
writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path)
case donut.EntityTooLarge: case donut.EntityTooLarge:
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
case donut.InvalidDigest: case donut.InvalidDigest:

Loading…
Cancel
Save