Merge pull request #578 from harshavardhana/pr_out_now_objectstorage_go_works_properly_with_multipart_upload

Now objectstorage-go works properly with Multipart upload.
master
Harshavardhana 10 years ago
commit 39e0875699
  1. 16
      pkg/api/api_definitions.go
  2. 50
      pkg/api/api_object_handlers.go
  3. 9
      pkg/api/api_response.go
  4. 8
      pkg/api/errors.go
  5. 16
      pkg/api/utils.go
  6. 5
      pkg/storage/drivers/donut/donut.go
  7. 2
      pkg/storage/drivers/driver.go
  8. 27
      pkg/storage/drivers/memory/memory.go
  9. 7
      pkg/storage/drivers/mocks/Driver.go

@ -95,7 +95,7 @@ type Owner struct {
DisplayName string
}
// InitiateMultipartUploadResult - Returns upload id to use
// InitiateMultipartUploadResult container for InitiateMultiPartUpload response, provides uploadID to start MultiPart upload
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 InitiateMultipartUploadResult" json:"-"`
@ -104,12 +104,22 @@ type InitiateMultipartUploadResult struct {
UploadID string `xml:"UploadId"`
}
// CompleteMultipartUpload - Construct object from uploaded parts
// CompleteMultipartUpload container for completing multipart upload
type CompleteMultipartUpload struct {
Part []Part
}
// Part - Description of a multipart part
// CompleteMultipartUploadResult container for completed multipart upload response
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 CompleteMultipartUploadResult" json:"-"`
Location string
Bucket string
Key string
ETag string
}
// Part description of a multipart part
type Part struct {
PartNumber int
ETag string

@ -21,6 +21,7 @@ import (
"strconv"
"encoding/xml"
"github.com/gorilla/mux"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
@ -173,8 +174,11 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path)
return
}
// ignoring error here, TODO find a way to reply back if we can
sizeInt64, _ := strconv.ParseInt(size, 10, 64)
sizeInt64, err := strconv.ParseInt(size, 10, 64)
if err != nil {
writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path)
return
}
calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt64, req.Body)
switch err := iodine.ToError(err).(type) {
case nil:
@ -271,19 +275,24 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path)
return
}
/// maximum Upload size for objects in a single operation
/// maximum Upload size for multipart objects in a single operation
if isMaxObjectSize(size) {
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
return
}
/// minimum Upload size for objects in a single operation
if isMinObjectSize(size) {
writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path)
// last part can be less than < 5MB so we need to figure out a way to handle it first
// and then enable below code (y4m4)
//
/// minimum Upload size for multipart objects in a single operation
// if isMinMultipartObjectSize(size) {
// writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path)
// return
// }
sizeInt64, err := strconv.ParseInt(size, 10, 64)
if err != nil {
writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path)
return
}
// ignoring error here, TODO find a way to reply back if we can
sizeInt64, _ := strconv.ParseInt(size, 10, 64)
var object, bucket string
vars := mux.Vars(req)
bucket = vars["bucket"]
@ -292,8 +301,7 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
partIDString := vars["partNumber"]
partID, err := strconv.Atoi(partIDString)
if err != nil {
// TODO find the write value for this error
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path)
}
calculatedMD5, err := server.driver.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body)
switch err := iodine.ToError(err).(type) {
@ -345,6 +353,7 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
if err != nil {
log.Error.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
return
}
partMap := make(map[int]string)
@ -357,8 +366,23 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
for _, part := range parts.Part {
partMap[part.PartNumber] = part.ETag
}
err = server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
switch err := iodine.ToError(err).(type) {
case nil:
response := generateCompleteMultpartUploadResult(bucket, object, "", etag)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType))
// set content-length to the size of the body
w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse)))
w.WriteHeader(http.StatusOK)
// write body
w.Write(encodedSuccessResponse)
default:
// TODO handle all other errors, properly
log.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
func (server *minioAPI) notImplementedHandler(w http.ResponseWriter, req *http.Request) {

@ -120,6 +120,15 @@ func generateInitiateMultipartUploadResult(bucket, key, uploadID string) Initiat
}
}
func generateCompleteMultpartUploadResult(bucket, key, location, etag string) CompleteMultipartUploadResult {
return CompleteMultipartUploadResult{
Location: location,
Bucket: bucket,
Key: key,
ETag: etag,
}
}
// writeSuccessResponse - write success headers
func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) {
setCommonHeaders(w, getContentTypeString(acceptsContentType))

@ -63,11 +63,12 @@ const (
SignatureDoesNotMatch
TooManyBuckets
MethodNotAllowed
InvalidPart
)
// Error codes, non exhaustive list - standard HTTP errors
const (
NotAcceptable = iota + 23
NotAcceptable = iota + 24
)
// Error code to Error structure map
@ -187,6 +188,11 @@ var errorCodeResponse = map[int]Error{
Description: "The requested resource is only capable of generating content not acceptable according to the Accept headers sent in the request.",
HTTPStatusCode: http.StatusNotAcceptable,
},
InvalidPart: {
Code: "InvalidPart",
Description: "One or more of the specified parts could not be found",
HTTPStatusCode: http.StatusBadRequest,
},
}
// errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown

@ -35,11 +35,11 @@ func isValidMD5(md5 string) bool {
}
/// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
// these should be configurable?
const (
// maximum object size per PUT request is 5GB
maxObjectSize = 1024 * 1024 * 1024 * 5
// mimimum object size per Multipart PUT request is 5MB
minMultiPartObjectSize = 1024 * 1024 * 5
// minimum object size per PUT request is 1B
minObjectSize = 1
)
@ -67,3 +67,15 @@ func isMinObjectSize(size string) bool {
}
return false
}
// isMinMultipartObjectSize - verify if the uploaded multipart is of minimum size
func isMinMultipartObjectSize(size string) bool {
i, err := strconv.ParseInt(size, 10, 64)
if err != nil {
return true
}
if i < minMultiPartObjectSize {
return true
}
return false
}

@ -30,6 +30,7 @@ import (
"io/ioutil"
"errors"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/donut"
"github.com/minio-io/minio/pkg/storage/drivers"
@ -407,6 +408,6 @@ func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int,
return "", iodine.New(errors.New("Not Implemented"), nil)
}
func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error {
return iodine.New(errors.New("Not Implemented"), nil)
func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
return "", iodine.New(errors.New("Not Implemented"), nil)
}

@ -42,7 +42,7 @@ type Driver interface {
// Object Multipart Operations
NewMultipartUpload(bucket string, key string, contentType string) (string, error)
CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error
CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error)
}
// BucketACL - bucket level access control

@ -195,10 +195,10 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
}
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
humanReadableErr, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
md5sum, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
// free
debug.FreeOSMemory()
return humanReadableErr, iodine.New(err, nil)
return md5sum, iodine.New(err, nil)
}
// getMD5AndData - this is written as a wrapper to capture md5sum and data in a more memory efficient way
@ -497,15 +497,18 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive
func (memory *memoryDriver) evictObject(a ...interface{}) {
cacheStats := memory.objects.Stats()
log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d",
log.Printf("CurrentSize: %d, CurrentItems: %d, TotalEvictions: %d",
cacheStats.Bytes, cacheStats.Items, cacheStats.Evictions)
key := a[0].(string)
// loop through all buckets
for bucket, storedBucket := range memory.storedBuckets {
for _, storedBucket := range memory.storedBuckets {
delete(storedBucket.objectMetadata, key)
// remove bucket if no objects found anymore
if len(storedBucket.objectMetadata) == 0 {
delete(memory.storedBuckets, bucket)
// TODO (y4m4)
// for now refrain from deleting buckets, due to multipart deletes before fullobject being written
// this case gets trigerred and we can't store the actual data at all receiving 404 on the client
// delete(memory.storedBuckets, bucket)
}
}
debug.FreeOSMemory()
@ -531,7 +534,7 @@ func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partI
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
}
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error {
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
// TODO verify upload id exists
memory.lock.Lock()
size := int64(0)
@ -542,7 +545,7 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
}
} else {
memory.lock.Unlock()
return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
}
@ -552,13 +555,16 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
if _, ok := parts[i]; ok {
if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true {
obj := object.([]byte)
io.Copy(&fullObject, bytes.NewBuffer(obj))
_, err := io.Copy(&fullObject, bytes.NewBuffer(obj))
if err != nil {
return "", iodine.New(err, nil)
}
} else {
log.Println("Cannot fetch: ", getMultipartKey(key, uploadID, i))
}
} else {
memory.lock.Unlock()
return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
}
@ -573,6 +579,5 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
md5sumSlice := md5.Sum(fullObject.Bytes())
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
_, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
return err
return memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
}

@ -146,10 +146,11 @@ func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, pa
}
// CompleteMultipartUpload is a mock
func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error {
func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error) {
ret := m.Called(bucket, key, uploadID, parts)
r0 := ret.Error(0)
r0 := ret.Get(0).(string)
r1 := ret.Error(1)
return r0
return r0, r1
}

Loading…
Cancel
Save