listMultipart: implement support for marker. (#1313)

master
Krishna Srinivas 9 years ago committed by Harshavardhana
parent 30b0b4deba
commit caa35f68fa
  1. 132
      object-api-multipart.go
  2. 8
      object-datatypes.go
  3. 3
      object-handlers.go

@ -109,43 +109,115 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
if delimiter == slashPathSeparator { if delimiter == slashPathSeparator {
recursive = false recursive = false
} }
result.IsTruncated = true
newMaxUploads := 0
prefixPath := bucket + slashPathSeparator + prefix // do not use filepath.Join so that we retain trailing '/' if any
prefixPath := path.Join(bucket, prefix) + slashPathSeparator if recursive {
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, keyMarker+uploadIDMarker, recursive, maxUploads) keyMarkerPath := filepath.Join(keyMarker, uploadIDMarker)
if e != nil { outerLoop:
return ListMultipartsInfo{}, probe.NewError(e) for {
} fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, keyMarkerPath, recursive, maxUploads-newMaxUploads)
if e != nil {
result.IsTruncated = !eof return ListMultipartsInfo{}, probe.NewError(e)
for _, fileInfo := range fileInfos {
if fileInfo.Mode.IsDir() {
isLeaf, fis := o.checkLeafDirectory(fileInfo.Name)
if isLeaf {
fileName := strings.Replace(fileInfo.Name, bucket+slashPathSeparator, "", 1)
fileName = path.Clean(fileName)
for _, newFileInfo := range fis {
newFileName := path.Base(newFileInfo.Name)
result.Uploads = append(result.Uploads, uploadMetadata{
Object: fileName,
UploadID: newFileName,
Initiated: newFileInfo.ModTime,
})
}
} else {
dirName := strings.Replace(fileInfo.Name, bucket+slashPathSeparator, "", 1)
result.CommonPrefixes = append(result.CommonPrefixes, dirName+slashPathSeparator)
} }
} else { for _, fi := range fileInfos {
fileName := path.Base(fileInfo.Name) keyMarkerPath = fi.Name
fileDir := strings.Replace(path.Dir(fileInfo.Name), bucket+slashPathSeparator, "", 1) fileName := filepath.Base(fi.Name)
if !strings.Contains(fileName, ".") { if strings.Contains(fileName, ".") {
// fileName contains partnumber and md5sum info, skip this.
continue
}
result.Uploads = append(result.Uploads, uploadMetadata{ result.Uploads = append(result.Uploads, uploadMetadata{
Object: fileDir, Object: filepath.Dir(fi.Name),
UploadID: fileName, UploadID: fileName,
Initiated: fileInfo.ModTime, Initiated: fi.ModTime,
}) })
result.NextKeyMarker = filepath.Dir(fi.Name)
result.NextUploadIDMarker = fileName
newMaxUploads++
if newMaxUploads == maxUploads {
break outerLoop
}
}
if eof {
result.IsTruncated = false
break outerLoop
}
}
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
var fileInfos []FileInfo
// read all the "fileInfos" in the prefix
for {
marker := ""
fis, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, marker, recursive, 1000)
if e != nil {
return ListMultipartsInfo{}, probe.NewError(e)
}
for _, fi := range fis {
marker = fi.Name
if fi.Mode.IsDir() {
fileInfos = append(fileInfos, fi)
} }
} }
if eof {
break
}
}
// Create "uploads" slice from "fileInfos" slice.
var uploads []uploadMetadata
for _, fi := range fileInfos {
leaf, entries := o.checkLeafDirectory(fi.Name)
if leaf {
for _, entry := range entries {
if strings.Contains(entry.Name, ".") {
continue
}
uploads = append(uploads, uploadMetadata{
Object: strings.TrimSuffix(fi.Name, slashPathSeparator),
UploadID: entry.Name,
Initiated: entry.ModTime,
})
}
continue
}
uploads = append(uploads, uploadMetadata{
Object: fi.Name,
})
}
index := 0
for i, upload := range uploads {
index = i
if upload.Object > keyMarker {
break
}
if uploads[index].Object == keyMarker && uploadIDMarker != "" {
if upload.UploadID > uploadIDMarker {
break
}
}
}
for ; index < len(uploads); index++ {
newMaxUploads++
if newMaxUploads == maxUploads {
break
}
if strings.HasSuffix(uploads[index].Object, slashPathSeparator) {
result.CommonPrefixes = append(result.CommonPrefixes, uploads[index].Object)
continue
}
result.Uploads = append(result.Uploads, uploads[index])
}
result.MaxUploads = newMaxUploads
result.IsTruncated = true
if index >= len(uploads)-1 {
result.IsTruncated = false
} }
return result, nil return result, nil
} }

@ -16,10 +16,7 @@
package main package main
import ( import "time"
"encoding/xml"
"time"
)
// BucketInfo - bucket name and create date // BucketInfo - bucket name and create date
type BucketInfo struct { type BucketInfo struct {
@ -107,6 +104,5 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part
// completeMultipartUpload container for completing multipart upload // completeMultipartUpload container for completing multipart upload
type completeMultipartUpload struct { type completeMultipartUpload struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload" json:"-"` Parts []completePart `xml:"Part"`
Parts []completePart `xml:"Part"`
} }

@ -1012,8 +1012,10 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
writeErrorResponse(w, r, ErrInternalError, r.URL.Path) writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
return return
} }
fmt.Println(string(completeMultipartBytes))
complMultipartUpload := &completeMultipartUpload{} complMultipartUpload := &completeMultipartUpload{}
if e = xml.Unmarshal(completeMultipartBytes, complMultipartUpload); e != nil { if e = xml.Unmarshal(completeMultipartBytes, complMultipartUpload); e != nil {
errorIf(probe.NewError(e), "XML Unmarshal failed", nil)
writeErrorResponse(w, r, ErrMalformedXML, r.URL.Path) writeErrorResponse(w, r, ErrMalformedXML, r.URL.Path)
return return
} }
@ -1028,7 +1030,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
part.ETag = strings.TrimSuffix(part.ETag, "\"") part.ETag = strings.TrimSuffix(part.ETag, "\"")
completeParts = append(completeParts, part) completeParts = append(completeParts, part)
} }
// Complete multipart upload. // Complete multipart upload.
objInfo, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) objInfo, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
if err != nil { if err != nil {

Loading…
Cancel
Save