Update totalnumber of multiparts inside CreateObjectPart(), also introduce support for UploadIDMarker and Prefix for listMultipartUploads

master
Harshavardhana 10 years ago
parent b3e0b3e4dc
commit e4c0d574dc
  1. 13
      pkg/api/api_bucket_handlers.go
  2. 35
      pkg/api/api_object_handlers.go
  3. 59
      pkg/storage/drivers/memory/memory_multipart.go

@ -56,6 +56,11 @@ func (server *minioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acce
//return false //return false
} }
} }
default:
{
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
} }
return true return true
} }
@ -79,7 +84,7 @@ func (server *minioAPI) listMultipartUploadsHandler(w http.ResponseWriter, req *
bucket := vars["bucket"] bucket := vars["bucket"]
resources, err := server.driver.ListMultipartUploads(bucket, resources) resources, err := server.driver.ListMultipartUploads(bucket, resources)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: // success case nil: // success
{ {
// generate response // generate response
@ -130,7 +135,7 @@ func (server *minioAPI) listObjectsHandler(w http.ResponseWriter, req *http.Requ
bucket := vars["bucket"] bucket := vars["bucket"]
objects, resources, err := server.driver.ListObjects(bucket, resources) objects, resources, err := server.driver.ListObjects(bucket, resources)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: // success case nil: // success
{ {
// generate response // generate response
@ -170,7 +175,7 @@ func (server *minioAPI) listBucketsHandler(w http.ResponseWriter, req *http.Requ
// return // return
// } // }
buckets, err := server.driver.ListBuckets() buckets, err := server.driver.ListBuckets()
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
// generate response // generate response
@ -307,7 +312,7 @@ func (server *minioAPI) headBucketHandler(w http.ResponseWriter, req *http.Reque
} }
default: default:
{ {
log.Println(err) log.Error.Println(iodine.New(err, nil))
error := getErrorCode(InternalError) error := getErrorCode(InternalError)
w.WriteHeader(error.HTTPStatusCode) w.WriteHeader(error.HTTPStatusCode)
} }

@ -50,7 +50,7 @@ func (server *minioAPI) getObjectHandler(w http.ResponseWriter, req *http.Reques
object = vars["object"] object = vars["object"]
metadata, err := server.driver.GetObjectMetadata(bucket, object) metadata, err := server.driver.GetObjectMetadata(bucket, object)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: // success case nil: // success
{ {
httpRange, err := getRequestedRange(req, metadata.Size) httpRange, err := getRequestedRange(req, metadata.Size)
@ -63,7 +63,7 @@ func (server *minioAPI) getObjectHandler(w http.ResponseWriter, req *http.Reques
setObjectHeaders(w, metadata) setObjectHeaders(w, metadata)
if _, err := server.driver.GetObject(w, bucket, object); err != nil { if _, err := server.driver.GetObject(w, bucket, object); err != nil {
// unable to write headers, we've already printed data. Just close the connection. // unable to write headers, we've already printed data. Just close the connection.
log.Error.Println(err) log.Error.Println(iodine.New(err, nil))
} }
case false: case false:
metadata.Size = httpRange.length metadata.Size = httpRange.length
@ -107,7 +107,7 @@ func (server *minioAPI) headObjectHandler(w http.ResponseWriter, req *http.Reque
object = vars["object"] object = vars["object"]
metadata, err := server.driver.GetObjectMetadata(bucket, object) metadata, err := server.driver.GetObjectMetadata(bucket, object)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
setObjectHeaders(w, metadata) setObjectHeaders(w, metadata)
@ -178,7 +178,7 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
return return
} }
calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt64, req.Body) calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt64, req.Body)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
w.Header().Set("ETag", calculatedMD5) w.Header().Set("ETag", calculatedMD5)
@ -201,14 +201,9 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
{ {
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
} }
case drivers.ImplementationError:
{
log.Error.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
default: default:
{ {
log.Error.Println(err) log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }
} }
@ -231,7 +226,7 @@ func (server *minioAPI) newMultipartUploadHandler(w http.ResponseWriter, req *ht
bucket = vars["bucket"] bucket = vars["bucket"]
object = vars["object"] object = vars["object"]
uploadID, err := server.driver.NewMultipartUpload(bucket, object, "") uploadID, err := server.driver.NewMultipartUpload(bucket, object, "")
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
response := generateInitiateMultipartUploadResult(bucket, object, uploadID) response := generateInitiateMultipartUploadResult(bucket, object, uploadID)
@ -247,7 +242,7 @@ func (server *minioAPI) newMultipartUploadHandler(w http.ResponseWriter, req *ht
} }
default: default:
{ {
log.Println(iodine.New(err, nil)) log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }
} }
@ -307,7 +302,7 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path)
} }
calculatedMD5, err := server.driver.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body) calculatedMD5, err := server.driver.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
w.Header().Set("ETag", calculatedMD5) w.Header().Set("ETag", calculatedMD5)
@ -336,7 +331,7 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
} }
default: default:
{ {
log.Error.Println(err) log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }
} }
@ -356,7 +351,7 @@ func (server *minioAPI) abortMultipartUploadHandler(w http.ResponseWriter, req *
objectResourcesMetadata := getObjectResources(req.URL.Query()) objectResourcesMetadata := getObjectResources(req.URL.Query())
err := server.driver.AbortMultipartUpload(bucket, object, objectResourcesMetadata.UploadID) err := server.driver.AbortMultipartUpload(bucket, object, objectResourcesMetadata.UploadID)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
setCommonHeaders(w, getContentTypeString(acceptsContentType), 0) setCommonHeaders(w, getContentTypeString(acceptsContentType), 0)
@ -368,7 +363,7 @@ func (server *minioAPI) abortMultipartUploadHandler(w http.ResponseWriter, req *
} }
default: default:
{ {
log.Println(err) log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }
} }
@ -391,7 +386,7 @@ func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http.
object := vars["object"] object := vars["object"]
objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, objectResourcesMetadata) objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, objectResourcesMetadata)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
response := generateListPartsResult(objectResourcesMetadata) response := generateListPartsResult(objectResourcesMetadata)
@ -407,7 +402,7 @@ func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http.
} }
default: default:
{ {
log.Println(err) log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }
} }
@ -444,7 +439,7 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
} }
etag, err := server.driver.CompleteMultipartUpload(bucket, object, objectResourcesMetadata.UploadID, partMap) etag, err := server.driver.CompleteMultipartUpload(bucket, object, objectResourcesMetadata.UploadID, partMap)
switch err := iodine.ToError(err).(type) { switch iodine.ToError(err).(type) {
case nil: case nil:
{ {
response := generateCompleteMultpartUploadResult(bucket, object, "", etag) response := generateCompleteMultpartUploadResult(bucket, object, "", etag)
@ -460,7 +455,7 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
} }
default: default:
{ {
log.Println(iodine.New(err, nil)) log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }
} }

@ -105,10 +105,6 @@ func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partI
if err != nil { if err != nil {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
// once successful, update totalParts
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
// free // free
debug.FreeOSMemory() debug.FreeOSMemory()
return etag, nil return etag, nil
@ -204,6 +200,9 @@ func (memory *memoryDriver) createObjectPart(bucket, key, uploadID string, partI
default: default:
storedBucket.partMetadata[partKey] = newPart storedBucket.partMetadata[partKey] = newPart
} }
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
memory.storedBuckets[bucket] = storedBucket memory.storedBuckets[bucket] = storedBucket
memory.lock.Unlock() memory.lock.Unlock()
@ -294,7 +293,7 @@ func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func (memory *memoryDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { func (memory *memoryDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
// TODO handle delimiter, prefix, uploadIDMarker // TODO handle delimiter
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
if _, ok := memory.storedBuckets[bucket]; ok == false { if _, ok := memory.storedBuckets[bucket]; ok == false {
@ -304,20 +303,42 @@ func (memory *memoryDriver) ListMultipartUploads(bucket string, resources driver
var uploads []*drivers.UploadMetadata var uploads []*drivers.UploadMetadata
for key, session := range storedBucket.multiPartSession { for key, session := range storedBucket.multiPartSession {
if len(uploads) > resources.MaxUploads { if strings.HasPrefix(key, resources.Prefix) {
sort.Sort(byKey(uploads)) if len(uploads) > resources.MaxUploads {
resources.Upload = uploads sort.Sort(byKey(uploads))
resources.NextKeyMarker = key resources.Upload = uploads
resources.NextUploadIDMarker = session.uploadID resources.NextKeyMarker = key
resources.IsTruncated = true resources.NextUploadIDMarker = session.uploadID
return resources, nil resources.IsTruncated = true
} return resources, nil
if key > resources.KeyMarker { }
upload := new(drivers.UploadMetadata) // uploadIDMarker is ignored if KeyMarker is empty
upload.Key = key switch {
upload.UploadID = session.uploadID case resources.KeyMarker != "" && resources.UploadIDMarker == "":
upload.Initiated = session.initiated if key > resources.KeyMarker {
uploads = append(uploads, upload) upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
if session.uploadID > resources.UploadIDMarker {
if key >= resources.KeyMarker {
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
default:
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
} }
} }
sort.Sort(byKey(uploads)) sort.Sort(byKey(uploads))

Loading…
Cancel
Save