Handle partNumberMarker with listObjectParts now and other fixes

master
Harshavardhana 10 years ago
parent 729447794a
commit 14b5828505
  1. 69
      pkg/api/api_object_handlers.go
  2. 2
      pkg/storage/drivers/donut/donut.go
  3. 2
      pkg/storage/drivers/driver.go
  4. 23
      pkg/storage/drivers/memory/memory.go
  5. 4
      pkg/storage/drivers/mocks/Driver.go

@ -27,7 +27,10 @@ import (
"github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers" "github.com/minio-io/minio/pkg/storage/drivers"
"github.com/minio-io/minio/pkg/utils/log" "github.com/minio-io/minio/pkg/utils/log"
"strings" )
const (
maxPartsList = 1000
) )
// GET Object // GET Object
@ -235,13 +238,9 @@ func (server *minioAPI) newMultipartUploadHandler(w http.ResponseWriter, req *ht
vars := mux.Vars(req) vars := mux.Vars(req)
bucket = vars["bucket"] bucket = vars["bucket"]
object = vars["object"] object = vars["object"]
var uploadID string uploadID, err := server.driver.NewMultipartUpload(bucket, object, "")
var err error switch err := iodine.ToError(err).(type) {
if uploadID, err = server.driver.NewMultipartUpload(bucket, object, ""); err != nil { case nil:
log.Println(iodine.New(err, nil))
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
return
}
response := generateInitiateMultipartUploadResult(bucket, object, uploadID) response := generateInitiateMultipartUploadResult(bucket, object, uploadID)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers // write headers
@ -251,6 +250,12 @@ func (server *minioAPI) newMultipartUploadHandler(w http.ResponseWriter, req *ht
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
// write body // write body
w.Write(encodedSuccessResponse) w.Write(encodedSuccessResponse)
case drivers.ObjectExists:
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
default:
log.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
} }
func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Request) { func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Request) {
@ -295,16 +300,21 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path)
return return
} }
var object, bucket string
vars := mux.Vars(req) vars := mux.Vars(req)
bucket = vars["bucket"] bucket := vars["bucket"]
object = vars["object"] object := vars["object"]
uploadID := vars["uploadId"]
// workaround for mux not splitting on & properly var uploadID, partIDString string
if len(uploadID) > 1 { for key, value := range req.URL.Query() {
uploadID = strings.Split(uploadID, "&")[0] switch true {
case key == "uploadId":
uploadID = value[0]
case key == "partNumber":
partIDString = value[0]
}
} }
partIDString := vars["partNumber"]
partID, err := strconv.Atoi(partIDString) partID, err := strconv.Atoi(partIDString)
if err != nil { if err != nil {
writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path)
@ -337,11 +347,6 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
{ {
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
} }
case drivers.ImplementationError:
{
log.Error.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
default: default:
{ {
log.Error.Println(err) log.Error.Println(err)
@ -359,9 +364,10 @@ func (server *minioAPI) abortMultipartUploadHandler(w http.ResponseWriter, req *
vars := mux.Vars(req) vars := mux.Vars(req)
bucket := vars["bucket"] bucket := vars["bucket"]
object := vars["object"] object := vars["object"]
uploadID := vars["uploadId"]
err := server.driver.AbortMultipartUpload(bucket, object, uploadID) objectResourcesMetadata := getObjectResources(req.URL.Query())
err := server.driver.AbortMultipartUpload(bucket, object, objectResourcesMetadata.UploadID)
switch err := iodine.ToError(err).(type) { switch err := iodine.ToError(err).(type) {
case nil: case nil:
setCommonHeaders(w, getContentTypeString(acceptsContentType)) setCommonHeaders(w, getContentTypeString(acceptsContentType))
@ -383,9 +389,12 @@ func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http.
vars := mux.Vars(req) vars := mux.Vars(req)
bucket := vars["bucket"] bucket := vars["bucket"]
object := vars["object"] object := vars["object"]
uploadID := vars["uploadId"] objectResourcesMetadata := getObjectResources(req.URL.Query())
if objectResourcesMetadata.MaxParts == 0 {
objectResourcesMetadata.MaxParts = maxPartsList
}
objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, uploadID) objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, objectResourcesMetadata)
switch err := iodine.ToError(err).(type) { switch err := iodine.ToError(err).(type) {
case nil: case nil:
response := generateListPartsResult(objectResourcesMetadata) response := generateListPartsResult(objectResourcesMetadata)
@ -428,13 +437,14 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
vars := mux.Vars(req) vars := mux.Vars(req)
bucket := vars["bucket"] bucket := vars["bucket"]
object := vars["object"] object := vars["object"]
uploadID := vars["uploadId"] objectResourcesMetadata := getObjectResources(req.URL.Query())
partMap := make(map[int]string) partMap := make(map[int]string)
for _, part := range parts.Part { for _, part := range parts.Part {
partMap[part.PartNumber] = part.ETag partMap[part.PartNumber] = part.ETag
} }
etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
etag, err := server.driver.CompleteMultipartUpload(bucket, object, objectResourcesMetadata.UploadID, partMap)
switch err := iodine.ToError(err).(type) { switch err := iodine.ToError(err).(type) {
case nil: case nil:
response := generateCompleteMultpartUploadResult(bucket, object, "", etag) response := generateCompleteMultpartUploadResult(bucket, object, "", etag)
@ -453,8 +463,3 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }
} }
func (server *minioAPI) notImplementedHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
writeErrorResponse(w, req, NotImplemented, acceptsContentType, req.URL.Path)
}

@ -412,7 +412,7 @@ func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts
return "", iodine.New(errors.New("Not Implemented"), nil) return "", iodine.New(errors.New("Not Implemented"), nil)
} }
func (d donutDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { func (d donutDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil)
} }

@ -44,7 +44,7 @@ type Driver interface {
AbortMultipartUpload(bucket, key, UploadID string) error AbortMultipartUpload(bucket, key, UploadID string) error
CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error)
ListObjectParts(bucket, key, uploadID string) (ObjectResourcesMetadata, error) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error)
} }
// BucketACL - bucket level access control // BucketACL - bucket level access control

@ -685,7 +685,7 @@ func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
func (memory *memoryDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { func (memory *memoryDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) {
// Verify upload id // Verify upload id
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
@ -693,24 +693,29 @@ func (memory *memoryDriver) ListObjectParts(bucket, key, uploadID string) (drive
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
} }
storedBucket := memory.storedBuckets[bucket] storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID { if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil)
} }
// TODO support PartNumberMarker and NextPartNumberMarker objectResourcesMetadata := resources
objectResourcesMetadata := drivers.ObjectResourcesMetadata{}
objectResourcesMetadata.UploadID = uploadID
objectResourcesMetadata.Bucket = bucket objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Key = key objectResourcesMetadata.Key = key
objectResourcesMetadata.MaxParts = 1000
var parts []*drivers.PartMetadata var parts []*drivers.PartMetadata
for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ { var startPartNumber int
switch {
case objectResourcesMetadata.PartNumberMarker == 0:
startPartNumber = 1
default:
startPartNumber = objectResourcesMetadata.PartNumberMarker
}
for i := startPartNumber; i <= storedBucket.multiPartSession[key].totalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts { if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts)) sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true objectResourcesMetadata.IsTruncated = true
objectResourcesMetadata.Part = parts objectResourcesMetadata.Part = parts
objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil return objectResourcesMetadata, nil
} }
object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)] object, ok := storedBucket.objectMetadata[resources.Bucket+"/"+getMultipartKey(resources.Key, resources.UploadID, i)]
if !ok { if !ok {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
} }

@ -158,8 +158,8 @@ func (m *Driver) CompleteMultipartUpload(bucket, key, uploadID string, parts map
} }
// ListObjectParts is a mock // ListObjectParts is a mock
func (m *Driver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { func (m *Driver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) {
ret := m.Called(bucket, key, uploadID) ret := m.Called(bucket, key, resources)
r0 := ret.Get(0).(drivers.ObjectResourcesMetadata) r0 := ret.Get(0).(drivers.ObjectResourcesMetadata)
r1 := ret.Error(1) r1 := ret.Error(1)

Loading…
Cancel
Save