Merge pull request #583 from harshavardhana/pr_out_add_listparts_support_wip_do_not_merge

master
Harshavardhana 10 years ago
commit 9d407f6ee1
  1. 40
      pkg/api/api_definitions.go
  2. 31
      pkg/api/api_object_handlers.go
  3. 43
      pkg/api/api_response.go
  4. 6
      pkg/api/api_router.go
  5. 17
      pkg/api/resources.go
  6. 4
      pkg/storage/drivers/donut/donut.go
  7. 42
      pkg/storage/drivers/driver.go
  8. 112
      pkg/storage/drivers/memory/memory.go
  9. 25
      pkg/storage/drivers/mocks/Driver.go

@ -55,6 +55,29 @@ type ListObjectsResponse struct {
Prefix string
}
// ListPartsResponse - format for list parts response
type ListPartsResponse struct {
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListPartsResult" json:"-"`
Bucket string
Key string
UploadID string `xml:"UploadId"`
Initiator Initiator
Owner Owner
// The class of storage used to store the object.
StorageClass string
PartNumberMarker int
NextPartNumberMarker int
MaxParts int
IsTruncated bool
// List of parts
Part []*Part
}
// ListBucketsResponse - format for list buckets response
type ListBucketsResponse struct {
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListAllMyBucketsResult" json:"-"`
@ -76,6 +99,14 @@ type Bucket struct {
CreationDate string
}
// Part container for part metadata
type Part struct {
PartNumber int
ETag string
LastModified string
Size int64
}
// Object container for object metadata
type Object struct {
ETag string
@ -89,6 +120,9 @@ type Object struct {
StorageClass string
}
// Initiator inherit from Owner struct, fields are same
type Initiator Owner
// Owner - bucket owner/principal
type Owner struct {
ID string
@ -126,12 +160,6 @@ type CompleteMultipartUploadResult struct {
ETag string
}
// Part description of a multipart part
type Part struct {
PartNumber int
ETag string
}
// List of not implemented bucket queries
var notimplementedBucketResourceNames = map[string]bool{
"policy": true,

@ -345,6 +345,37 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
}
}
func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
if acceptsContentType == unknownContentType {
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
return
}
vars := mux.Vars(req)
bucket := vars["bucket"]
object := vars["object"]
uploadID := vars["uploadId"]
objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, uploadID)
switch err := iodine.ToError(err).(type) {
case nil:
response := generateListPartsResult(objectResourcesMetadata)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType))
// set content-length to the size of the body
w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse)))
w.WriteHeader(http.StatusOK)
// write body
w.Write(encodedSuccessResponse)
case drivers.InvalidUploadID:
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
default:
log.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
if acceptsContentType == unknownContentType {

@ -112,6 +112,7 @@ func generateListObjectsResponse(bucket string, objects []drivers.ObjectMetadata
return data
}
// generateInitiateMultipartUploadResult
func generateInitiateMultipartUploadResult(bucket, key, uploadID string) InitiateMultipartUploadResult {
return InitiateMultipartUploadResult{
Bucket: bucket,
@ -120,6 +121,7 @@ func generateInitiateMultipartUploadResult(bucket, key, uploadID string) Initiat
}
}
// generateCompleteMultipartUploadResult
func generateCompleteMultpartUploadResult(bucket, key, location, etag string) CompleteMultipartUploadResult {
return CompleteMultipartUploadResult{
Location: location,
@ -129,13 +131,50 @@ func generateCompleteMultpartUploadResult(bucket, key, location, etag string) Co
}
}
// writeSuccessResponse - write success headers
// partNumber
type partNumber []*Part
func (b partNumber) Len() int { return len(b) }
func (b partNumber) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b partNumber) Less(i, j int) bool { return b[i].PartNumber < b[j].PartNumber }
// generateListPartsResult
func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) ListPartsResponse {
// TODO - support EncodingType in xml decoding
listPartsResponse := ListPartsResponse{}
listPartsResponse.Bucket = objectMetadata.Bucket
listPartsResponse.Key = objectMetadata.Key
listPartsResponse.UploadID = objectMetadata.UploadID
listPartsResponse.StorageClass = "STANDARD"
listPartsResponse.Initiator.ID = "minio"
listPartsResponse.Initiator.DisplayName = "minio"
listPartsResponse.Owner.ID = "minio"
listPartsResponse.Owner.DisplayName = "minio"
listPartsResponse.MaxParts = objectMetadata.MaxParts
listPartsResponse.PartNumberMarker = objectMetadata.PartNumberMarker
listPartsResponse.IsTruncated = objectMetadata.IsTruncated
listPartsResponse.NextPartNumberMarker = objectMetadata.NextPartNumberMarker
listPartsResponse.Part = make([]*Part, len(objectMetadata.Part))
for _, part := range objectMetadata.Part {
newPart := &Part{}
newPart.PartNumber = part.PartNumber
newPart.ETag = part.ETag
newPart.Size = part.Size
newPart.LastModified = part.LastModified.Format(iso8601Format)
listPartsResponse.Part = append(listPartsResponse.Part, newPart)
}
return listPartsResponse
}
// writeSuccessResponse write success headers
func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) {
setCommonHeaders(w, getContentTypeString(acceptsContentType))
w.WriteHeader(http.StatusOK)
}
// writeErrorRespone - write error headers
// writeErrorRespone write error headers
func writeErrorResponse(w http.ResponseWriter, req *http.Request, errorType int, acceptsContentType contentType, resource string) {
error := getErrorCode(errorType)
// generate error response

@ -45,14 +45,16 @@ func HTTPHandler(driver drivers.Driver) http.Handler {
mux.HandleFunc("/{bucket}", api.listObjectsHandler).Methods("GET")
mux.HandleFunc("/{bucket}", api.putBucketHandler).Methods("PUT")
mux.HandleFunc("/{bucket}", api.headBucketHandler).Methods("HEAD")
mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.headObjectHandler).Methods("HEAD")
if featureflags.Get(featureflags.MultipartPutObject) {
log.Println("Enabling feature", featureflags.MultipartPutObject)
mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT")
mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectPartHandler).Queries("partNumber",
"{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT")
mux.HandleFunc("/{bucket}/{object:.*}", api.listObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.completeMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST")
mux.HandleFunc("/{bucket}/{object:.*}", api.newMultipartUploadHandler).Methods("POST")
}
mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectHandler).Methods("PUT")
var conf = config.Config{}

@ -42,6 +42,23 @@ func getBucketResources(values url.Values) (v drivers.BucketResourcesMetadata) {
return
}
// parse object url queries
func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) {
for key, value := range values {
switch true {
case key == "uploadId":
v.UploadID = value[0]
case key == "part-number-marker":
v.PartNumberMarker, _ = strconv.Atoi(value[0])
case key == "max-parts":
v.MaxParts, _ = strconv.Atoi(value[0])
case key == "encoding-type":
v.EncodingType = value[0]
}
}
return
}
// check if req query values have acl
func isRequestBucketACL(values url.Values) bool {
for key := range values {

@ -411,3 +411,7 @@ func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int,
func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
return "", iodine.New(errors.New("Not Implemented"), nil)
}
func (d donutDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil)
}

@ -35,14 +35,15 @@ type Driver interface {
// Object Operations
GetObject(w io.Writer, bucket, object string) (int64, error)
GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error)
GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error)
GetObjectMetadata(bucket, key, prefix string) (ObjectMetadata, error)
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error)
CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error)
// Object Multipart Operations
NewMultipartUpload(bucket string, key string, contentType string) (string, error)
CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error)
NewMultipartUpload(bucket, key, contentType string) (string, error)
CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error)
ListObjectParts(bucket, key, uploadID string) (ObjectResourcesMetadata, error)
}
// BucketACL - bucket level access control
@ -103,6 +104,37 @@ const (
DefaultMode
)
// PartMetadata - various types of individual part resources
type PartMetadata struct {
PartNumber int
LastModified time.Time
ETag string
Size int64
}
// ObjectResourcesMetadata - various types of object resources
type ObjectResourcesMetadata struct {
Bucket string
EncodingType string
Key string
UploadID string
Initiator struct {
ID string
DisplayName string
}
Owner struct {
ID string
DisplayName string
}
StorageClass string
PartNumberMarker int
NextPartNumberMarker int
MaxParts int
IsTruncated bool
Part []*PartMetadata
}
// BucketResourcesMetadata - various types of bucket resources
type BucketResourcesMetadata struct {
Prefix string

@ -47,8 +47,14 @@ type memoryDriver struct {
}
type storedBucket struct {
bucketMetadata drivers.BucketMetadata
objectMetadata map[string]drivers.ObjectMetadata
bucketMetadata drivers.BucketMetadata
objectMetadata map[string]drivers.ObjectMetadata
multiPartSession map[string]multiPartSession
}
type multiPartSession struct {
totalParts int
uploadID string
}
const (
@ -533,19 +539,20 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string)
}
memory.lock.RUnlock()
memory.lock.Lock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])
md5sumBytes := md5.Sum([]byte(uploadID))
// CreateObject expects in base64 which is coming over http request header
// while all response headers with ETag are hex encoding
md5sum := base64.StdEncoding.EncodeToString(md5sumBytes[:])
// Create UploadID session, this is a temporary work around to instantiate a session.
// It would not be valid in future, since we need to work out proper sessions so that
// we can cleanly abort session and propagate failures.
_, err := memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
return uploadID, iodine.New(err, nil)
storedBucket.multiPartSession = make(map[string]multiPartSession)
storedBucket.multiPartSession[key] = multiPartSession{
uploadID: uploadID,
totalParts: 0,
}
memory.storedBuckets[bucket] = storedBucket
memory.lock.Unlock()
return uploadID, nil
}
func getMultipartKey(key string, uploadID string, partNumber int) string {
@ -554,17 +561,29 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
if !ok {
memory.lock.RLock()
storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
memory.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
memory.lock.RUnlock()
etag, err := memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
// once successful, update totalParts
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
return etag, nil
}
func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string) {
memory.lock.Lock()
defer memory.lock.Unlock()
memory.objects.Delete(bucket + "/" + key + "?uploadId=" + uploadID)
delete(memory.storedBuckets[bucket].multiPartSession, key)
}
func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, parts map[int]string) {
@ -577,28 +596,29 @@ func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, part
}
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
// Verify upload id
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
if !ok {
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
if !drivers.IsValidBucket(bucket) {
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
memory.lock.RLock()
if _, ok := memory.storedBuckets[bucket]; ok == false {
memory.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
memory.lock.RUnlock()
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
memory.lock.RUnlock()
memory.lock.Lock()
var size int64
for i := range parts {
object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
if !ok {
memory.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
@ -643,3 +663,51 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
memory.cleanupMultipartSession(bucket, key, uploadID)
return etag, nil
}
// partNumber is a sortable interface for Part slice
type partNumber []*drivers.PartMetadata
func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
func (memory *memoryDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) {
// Verify upload id
memory.lock.RLock()
defer memory.lock.RUnlock()
if _, ok := memory.storedBuckets[bucket]; ok == false {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
// TODO support PartNumberMarker and NextPartNumberMarker
objectResourcesMetadata := drivers.ObjectResourcesMetadata{}
objectResourcesMetadata.UploadID = uploadID
objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Key = key
objectResourcesMetadata.MaxParts = 1000
var parts []*drivers.PartMetadata
for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}
object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
if !ok {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
partMetadata := &drivers.PartMetadata{}
partMetadata.Size = object.Size
partMetadata.LastModified = object.Created
partMetadata.ETag = object.Md5
partMetadata.PartNumber = i
parts = append(parts, partMetadata)
}
sort.Sort(partNumber(parts))
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}

@ -56,11 +56,10 @@ func (m *Driver) SetBucketMetadata(bucket, acl string) error {
// SetGetObjectWriter is a mock
func (m *Driver) SetGetObjectWriter(bucket, object string, data []byte) {
m.ObjectWriterData[bucket+":"+object] = data
// println(string(m.ObjectWriterData["bucket:object"]))
}
// GetObject is a mock
func (m *Driver) GetObject(w io.Writer, bucket string, object string) (int64, error) {
func (m *Driver) GetObject(w io.Writer, bucket, object string) (int64, error) {
ret := m.Called(w, bucket, object)
r0 := ret.Get(0).(int64)
r1 := ret.Error(1)
@ -74,7 +73,7 @@ func (m *Driver) GetObject(w io.Writer, bucket string, object string) (int64, er
}
// GetPartialObject is a mock
func (m *Driver) GetPartialObject(w io.Writer, bucket string, object string, start int64, length int64) (int64, error) {
func (m *Driver) GetPartialObject(w io.Writer, bucket, object string, start int64, length int64) (int64, error) {
ret := m.Called(w, bucket, object, start, length)
r0 := ret.Get(0).(int64)
@ -95,7 +94,7 @@ func (m *Driver) GetPartialObject(w io.Writer, bucket string, object string, sta
}
// GetObjectMetadata is a mock
func (m *Driver) GetObjectMetadata(bucket string, object string, prefix string) (drivers.ObjectMetadata, error) {
func (m *Driver) GetObjectMetadata(bucket, object, prefix string) (drivers.ObjectMetadata, error) {
ret := m.Called(bucket, object, prefix)
r0 := ret.Get(0).(drivers.ObjectMetadata)
@ -116,7 +115,7 @@ func (m *Driver) ListObjects(bucket string, resources drivers.BucketResourcesMet
}
// CreateObject is a mock
func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) {
func (m *Driver) CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error) {
ret := m.Called(bucket, key, contentType, md5sum, size, data)
r0 := ret.Get(0).(string)
@ -126,7 +125,7 @@ func (m *Driver) CreateObject(bucket string, key string, contentType string, md5
}
// NewMultipartUpload is a mock
func (m *Driver) NewMultipartUpload(bucket string, key string, contentType string) (string, error) {
func (m *Driver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
ret := m.Called(bucket, key, contentType)
r0 := ret.Get(0).(string)
@ -136,7 +135,7 @@ func (m *Driver) NewMultipartUpload(bucket string, key string, contentType strin
}
// CreateObjectPart is a mock
func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) {
func (m *Driver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) {
ret := m.Called(bucket, key, uploadID, partID, contentType, md5sum, size, data)
r0 := ret.Get(0).(string)
@ -146,7 +145,7 @@ func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, pa
}
// CompleteMultipartUpload is a mock
func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) (string, error) {
func (m *Driver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
ret := m.Called(bucket, key, uploadID, parts)
r0 := ret.Get(0).(string)
@ -154,3 +153,13 @@ func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID str
return r0, r1
}
// ListObjectParts is a mock
func (m *Driver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) {
ret := m.Called(bucket, key, uploadID)
r0 := ret.Get(0).(drivers.ObjectResourcesMetadata)
r1 := ret.Error(1)
return r0, r1
}

Loading…
Cancel
Save