FS/ListMultipart: Fix FS list-multipart to work for unit test cases.

master
Krishna Srinivas 8 years ago committed by Harshavardhana
parent 616a257bfa
commit 6dc8323684
  1. 207
      fs-v1-multipart.go
  2. 26
      fs-v1.go
  3. 2
      test-utils_test.go
  4. 42
      tree-walk-fs.go
  5. 138
      xl-v1-multipart-common.go
  6. 25
      xl-v1-multipart.go

@ -92,22 +92,16 @@ func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta
return uploadID, nil return uploadID, nil
} }
func isMultipartObject(storage StorageAPI, bucket, prefix string) bool { // Returns if the prefix is a multipart upload.
_, err := storage.StatFile(bucket, path.Join(prefix, fsMetaJSONFile)) func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
if err != nil { _, err := fs.storage.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
if err == errFileNotFound { return err == nil
return false
}
errorIf(err, "Unable to access "+path.Join(prefix, fsMetaJSONFile))
return false
}
return true
} }
// listUploadsInfo - list all uploads info. // listUploadsInfo - list all uploads info.
func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) { func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) {
splitPrefixes := strings.SplitN(prefixPath, "/", 3) splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadIDs, err := getUploadIDs(splitPrefixes[1], splitPrefixes[2], fs.storage) uploadIDs, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], fs.storage)
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
return []uploadInfo{}, nil return []uploadInfo{}, nil
@ -118,96 +112,6 @@ func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, er
return uploads, nil return uploads, nil
} }
// listMetaBucketMultipart - list all objects at a given prefix inside minioMetaBucket.
func (fs fsObjects) listMetaBucketMultipart(prefixPath string, markerPath string, recursive bool, maxKeys int) (fileInfos []FileInfo, eof bool, err error) {
walker := fs.lookupTreeWalk(listParams{minioMetaBucket, recursive, markerPath, prefixPath})
if walker == nil {
walker = fs.startTreeWalk(minioMetaBucket, prefixPath, markerPath, recursive)
}
// newMaxKeys tracks the size of entries which are going to be
// returned back.
var newMaxKeys int
// Following loop gathers and filters out special files inside
// minio meta volume.
for {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound {
return nil, true, nil
}
return nil, false, toObjectErr(walkResult.err, minioMetaBucket, prefixPath)
}
fileInfo := walkResult.fileInfo
var uploads []uploadInfo
if fileInfo.Mode.IsDir() {
// List all the entries if fi.Name is a leaf directory, if
// fi.Name is not a leaf directory then the resulting
// entries are empty.
uploads, err = fs.listUploadsInfo(fileInfo.Name)
if err != nil {
return nil, false, err
}
}
if len(uploads) > 0 {
for _, upload := range uploads {
fileInfos = append(fileInfos, FileInfo{
Name: path.Join(fileInfo.Name, upload.UploadID),
ModTime: upload.Initiated,
})
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested.
if newMaxKeys == maxKeys {
break
}
}
} else {
// We reach here for a non-recursive case non-leaf entry
// OR recursive case with fi.Name.
if !fileInfo.Mode.IsDir() { // Do not skip non-recursive case directory entries.
// Validate if 'fi.Name' is incomplete multipart.
if !strings.HasSuffix(fileInfo.Name, fsMetaJSONFile) {
continue
}
fileInfo.Name = path.Dir(fileInfo.Name)
}
fileInfos = append(fileInfos, fileInfo)
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested.
if newMaxKeys == maxKeys {
break
}
}
}
if !eof && len(fileInfos) != 0 {
// EOF has not reached, hence save the walker channel to the map so that the walker go routine
// can continue from where it left off for the next list request.
lastFileInfo := fileInfos[len(fileInfos)-1]
markerPath = lastFileInfo.Name
fs.saveTreeWalk(listParams{minioMetaBucket, recursive, markerPath, prefixPath}, walker)
}
// Return entries here.
return fileInfos, eof, nil
}
// FIXME: Currently the code sorts based on keyName/upload-id which is
// not correct based on the S3 specs. According to s3 specs we are
// supposed to only lexically sort keyNames and then for keyNames with
// multiple upload ids should be sorted based on the initiated time.
// Currently this case is not handled.
// listMultipartUploadsCommon - lists all multipart uploads, common function for both object layers. // listMultipartUploadsCommon - lists all multipart uploads, common function for both object layers.
func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{} result := ListMultipartsInfo{}
@ -259,9 +163,12 @@ func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload
result.IsTruncated = true result.IsTruncated = true
result.MaxUploads = maxUploads result.MaxUploads = maxUploads
result.KeyMarker = keyMarker
result.Prefix = prefix
result.Delimiter = delimiter
// Not using path.Join() as it strips off the trailing '/'. // Not using path.Join() as it strips off the trailing '/'.
multipartPrefixPath := pathJoin(mpartMetaPrefix, pathJoin(bucket, prefix)) multipartPrefixPath := pathJoin(mpartMetaPrefix, bucket, prefix)
if prefix == "" { if prefix == "" {
// Should have a trailing "/" if prefix is "" // Should have a trailing "/" if prefix is ""
// For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is ""
@ -269,33 +176,83 @@ func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload
} }
multipartMarkerPath := "" multipartMarkerPath := ""
if keyMarker != "" { if keyMarker != "" {
keyMarkerPath := pathJoin(pathJoin(bucket, keyMarker), uploadIDMarker) multipartMarkerPath = pathJoin(mpartMetaPrefix, bucket, keyMarker)
multipartMarkerPath = pathJoin(mpartMetaPrefix, keyMarkerPath)
} }
var uploads []uploadMetadata
// List all the multipart files at prefixPath, starting with marker keyMarkerPath. var err error
fileInfos, eof, err := fs.listMetaBucketMultipart(multipartPrefixPath, multipartMarkerPath, recursive, maxUploads) var eof bool
if uploadIDMarker != "" {
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage)
if err != nil { if err != nil {
return ListMultipartsInfo{}, err return ListMultipartsInfo{}, err
} }
maxUploads = maxUploads - len(uploads)
// Loop through all the received files fill in the multiparts result. }
for _, fileInfo := range fileInfos { if maxUploads > 0 {
walker := fs.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
if walker == nil {
walker = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool {
return fs.isMultipartUpload(bucket, object)
})
}
for maxUploads > 0 {
walkResult, ok := <-walker.ch
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound {
eof = true
break
}
return ListMultipartsInfo{}, err
}
entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket)))
if strings.HasSuffix(walkResult.entry, slashSeparator) {
uploads = append(uploads, uploadMetadata{
Object: entry,
})
maxUploads--
if maxUploads == 0 {
if walkResult.end {
eof = true
break
}
}
continue
}
var tmpUploads []uploadMetadata
var end bool
uploadIDMarker = ""
tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage)
if err != nil {
return ListMultipartsInfo{}, err
}
uploads = append(uploads, tmpUploads...)
maxUploads -= len(tmpUploads)
if walkResult.end && end {
eof = true
break
}
}
}
// Loop through all the received uploads fill in the multiparts result.
for _, upload := range uploads {
var objectName string var objectName string
var uploadID string var uploadID string
if fileInfo.Mode.IsDir() { if strings.HasSuffix(upload.Object, slashSeparator) {
// All directory entries are common prefixes. // All directory entries are common prefixes.
uploadID = "" // Upload ids are empty for CommonPrefixes. uploadID = "" // Upload ids are empty for CommonPrefixes.
objectName = strings.TrimPrefix(fileInfo.Name, retainSlash(pathJoin(mpartMetaPrefix, bucket))) objectName = upload.Object
result.CommonPrefixes = append(result.CommonPrefixes, objectName) result.CommonPrefixes = append(result.CommonPrefixes, objectName)
} else { } else {
uploadID = path.Base(fileInfo.Name) uploadID = upload.UploadID
objectName = strings.TrimPrefix(path.Dir(fileInfo.Name), retainSlash(pathJoin(mpartMetaPrefix, bucket))) objectName = upload.Object
result.Uploads = append(result.Uploads, uploadMetadata{ result.Uploads = append(result.Uploads, upload)
Object: objectName,
UploadID: uploadID,
Initiated: fileInfo.ModTime,
})
} }
result.NextKeyMarker = objectName result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID result.NextUploadIDMarker = uploadID
@ -639,13 +596,17 @@ func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string)
// Validate if there are other incomplete upload-id's present for // Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'. // the object, if yes do not attempt to delete 'uploads.json'.
uploadIDs, err := getUploadIDs(bucket, object, fs.storage) uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage)
if err == nil { if err == nil {
uploadIDIdx := uploadIDs.Index(uploadID) uploadIDIdx := uploadsJSON.Index(uploadID)
if uploadIDIdx != -1 { if uploadIDIdx != -1 {
uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
}
if len(uploadsJSON.Uploads) > 0 {
err = updateUploadsJSON(bucket, object, uploadsJSON, fs.storage)
if err != nil {
return toObjectErr(err, bucket, object)
} }
if len(uploadIDs.Uploads) > 0 {
return nil return nil
} }
} }

@ -20,6 +20,7 @@ import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"io" "io"
"os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
@ -289,6 +290,22 @@ func isBucketExist(storage StorageAPI, bucketName string) bool {
} }
func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Convert entry to FileInfo
entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) {
if strings.HasSuffix(entry, slashSeparator) {
// Object name needs to be full path.
fileInfo.Name = entry
fileInfo.Mode = os.ModeDir
return
}
if fileInfo, err = fs.storage.StatFile(bucket, entry); err != nil {
return
}
// Object name needs to be full path.
fileInfo.Name = entry
return
}
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
@ -334,7 +351,9 @@ func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxK
walker := fs.lookupTreeWalk(listParams{bucket, recursive, marker, prefix}) walker := fs.lookupTreeWalk(listParams{bucket, recursive, marker, prefix})
if walker == nil { if walker == nil {
walker = fs.startTreeWalk(bucket, prefix, marker, recursive) walker = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool {
return !strings.HasSuffix(object, slashSeparator)
})
} }
var fileInfos []FileInfo var fileInfos []FileInfo
var eof bool var eof bool
@ -354,7 +373,10 @@ func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxK
} }
return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix)
} }
fileInfo := walkResult.fileInfo fileInfo, err := entryToFileInfo(walkResult.entry)
if err != nil {
return ListObjectsInfo{}, nil
}
nextMarker = fileInfo.Name nextMarker = fileInfo.Name
fileInfos = append(fileInfos, fileInfo) fileInfos = append(fileInfos, fileInfo)
if walkResult.end { if walkResult.end {

@ -86,10 +86,8 @@ func ExecObjectLayerTest(t *testing.T, objTest func(obj ObjectLayer, instanceTyp
t.Fatalf("Initialization of object layer failed for single node setup: %s", err.Error()) t.Fatalf("Initialization of object layer failed for single node setup: %s", err.Error())
} }
// FIXME: enable FS tests after fixing it. // FIXME: enable FS tests after fixing it.
if false {
// Executing the object layer tests for single node setup. // Executing the object layer tests for single node setup.
objTest(objLayer, singleNodeTestStr, t) objTest(objLayer, singleNodeTestStr, t)
}
objLayer, fsDirs, err := getXLObjectLayer() objLayer, fsDirs, err := getXLObjectLayer()
if err != nil { if err != nil {

@ -17,7 +17,6 @@
package main package main
import ( import (
"os"
"path" "path"
"sort" "sort"
"strings" "strings"
@ -34,34 +33,17 @@ type treeWalkerFS struct {
// Tree walk result carries results of tree walking. // Tree walk result carries results of tree walking.
type treeWalkResultFS struct { type treeWalkResultFS struct {
fileInfo FileInfo entry string
err error err error
end bool end bool
} }
// treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files. // treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResultFS) bool, count *int) bool { func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResultFS) bool, count *int, isLeaf func(string, string) bool) bool {
// Example: // Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt" // called with prefixDir="one/two/three/four/" and marker="five.txt"
// Convert entry to FileInfo
entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) {
if strings.HasSuffix(entry, slashSeparator) {
// Object name needs to be full path.
fileInfo.Name = path.Join(prefixDir, entry)
fileInfo.Name += slashSeparator
fileInfo.Mode = os.ModeDir
return
}
if fileInfo, err = fs.storage.StatFile(bucket, path.Join(prefixDir, entry)); err != nil {
return
}
// Object name needs to be full path.
fileInfo.Name = path.Join(prefixDir, entry)
return
}
var markerBase, markerDir string var markerBase, markerDir string
if marker != "" { if marker != "" {
// Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt" // Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt"
@ -78,11 +60,15 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
return false return false
} }
if entryPrefixMatch != "" {
for i, entry := range entries { for i, entry := range entries {
if entryPrefixMatch != "" {
if !strings.HasPrefix(entry, entryPrefixMatch) { if !strings.HasPrefix(entry, entryPrefixMatch) {
entries[i] = "" entries[i] = ""
continue
}
} }
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
entries[i] = strings.TrimSuffix(entry, slashSeparator)
} }
} }
sort.Strings(entries) sort.Strings(entries)
@ -129,19 +115,13 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
} }
*count-- *count--
prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories.
if !fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count) { if !fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) {
return false return false
} }
continue continue
} }
*count-- *count--
fileInfo, err := entryToFileInfo(entry) if !send(treeWalkResultFS{entry: pathJoin(prefixDir, entry)}) {
if err != nil {
// The file got deleted in the interim between ListDir() and StatFile()
// Ignore error and continue.
continue
}
if !send(treeWalkResultFS{fileInfo: fileInfo}) {
return false return false
} }
} }
@ -149,7 +129,7 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string,
} }
// Initiate a new treeWalk in a goroutine. // Initiate a new treeWalk in a goroutine.
func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool) *treeWalkerFS { func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalkerFS {
// Example 1 // Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
@ -186,7 +166,7 @@ func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool)
return false return false
} }
} }
fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count) fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf)
}() }()
return &walkNotify return &walkNotify
} }

@ -91,15 +91,17 @@ func (u uploadsV1) WriteTo(writer io.Writer) (n int64, err error) {
return int64(m), err return int64(m), err
} }
// getUploadIDs - get all the saved upload id's. // readUploadsJSON - get all the saved uploads JSON.
func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) { func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) {
uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var uploads = make([]uploadsV1, len(storageDisks)) var uploads = make([]uploadsV1, len(storageDisks))
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Read `uploads.json` from all disks.
for index, disk := range storageDisks { for index, disk := range storageDisks {
wg.Add(1) wg.Add(1)
// Read `uploads.json` in a routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
r, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0)) r, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0))
@ -116,8 +118,11 @@ func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs
errs[index] = nil errs[index] = nil
}(index, disk) }(index, disk)
} }
// Wait for all the routines.
wg.Wait() wg.Wait()
// Return for first error.
for _, err = range errs { for _, err = range errs {
if err != nil { if err != nil {
return uploadsV1{}, err return uploadsV1{}, err
@ -128,13 +133,16 @@ func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs
return uploads[0], nil return uploads[0], nil
} }
func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks ...StorageAPI) error { // uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error {
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Update `uploads.json` for all the disks.
for index, disk := range storageDisks { for index, disk := range storageDisks {
wg.Add(1) wg.Add(1)
// Update `uploads.json` in routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
w, wErr := disk.CreateFile(minioMetaBucket, uploadsPath) w, wErr := disk.CreateFile(minioMetaBucket, uploadsPath)
@ -142,7 +150,7 @@ func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks .
errs[index] = wErr errs[index] = wErr
return return
} }
_, wErr = uploadIDs.WriteTo(w) _, wErr = uploadsJSON.WriteTo(w)
if wErr != nil { if wErr != nil {
errs[index] = wErr errs[index] = wErr
return return
@ -158,8 +166,10 @@ func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks .
}(index, disk) }(index, disk)
} }
// Wait for all the routines to finish updating `uploads.json`
wg.Wait() wg.Wait()
// Return for first error.
for _, err := range errs { for _, err := range errs {
if err != nil { if err != nil {
return err return err
@ -169,24 +179,44 @@ func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks .
return nil return nil
} }
// newUploadsV1 - initialize new uploads v1.
func newUploadsV1(format string) uploadsV1 {
uploadIDs := uploadsV1{}
uploadIDs.Version = "1"
uploadIDs.Format = format
return uploadIDs
}
// writeUploadJSON - create `uploads.json` or update it with new uploadID. // writeUploadJSON - create `uploads.json` or update it with new uploadID.
func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) error { func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) (err error) {
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile) tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile)
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
uploadIDs, err := getUploadIDs(bucket, object, storageDisks...) var uploadsJSON uploadsV1
if err != nil && err != errFileNotFound { uploadsJSON, err = readUploadsJSON(bucket, object, storageDisks...)
if err != nil {
// For any other errors.
if err != errFileNotFound {
return err return err
} }
uploadIDs.Version = "1" if len(storageDisks) == 1 {
uploadIDs.Format = "xl" // Set uploads format to `fs` for single disk.
uploadIDs.AddUploadID(uploadID, initiated) uploadsJSON = newUploadsV1("fs")
} else {
// Set uploads format to `xl` otherwise.
uploadsJSON = newUploadsV1("xl")
}
}
// Add a new upload id.
uploadsJSON.AddUploadID(uploadID, initiated)
// Update `uploads.json` on all disks.
for index, disk := range storageDisks { for index, disk := range storageDisks {
wg.Add(1) wg.Add(1)
// Update `uploads.json` in a routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
w, wErr := disk.CreateFile(minioMetaBucket, tmpUploadsPath) w, wErr := disk.CreateFile(minioMetaBucket, tmpUploadsPath)
@ -194,7 +224,7 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
errs[index] = wErr errs[index] = wErr
return return
} }
_, wErr = uploadIDs.WriteTo(w) _, wErr = uploadsJSON.WriteTo(w)
if wErr != nil { if wErr != nil {
errs[index] = wErr errs[index] = wErr
return return
@ -220,8 +250,10 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
}(index, disk) }(index, disk)
} }
// Wait for all the writes to finish.
wg.Wait() wg.Wait()
// Return for first error encountered.
for _, err = range errs { for _, err = range errs {
if err != nil { if err != nil {
return err return err
@ -235,11 +267,17 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error { func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error {
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Construct uploadIDPath.
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// Cleanup uploadID for all disks.
for index, disk := range storageDisks { for index, disk := range storageDisks {
wg.Add(1) wg.Add(1)
// Cleanup each uploadID in a routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
err := cleanupDir(disk, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID)) err := cleanupDir(disk, minioMetaBucket, uploadIDPath)
if err != nil { if err != nil {
errs[index] = err errs[index] = err
return return
@ -247,8 +285,11 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora
errs[index] = nil errs[index] = nil
}(index, disk) }(index, disk)
} }
// Wait for all the cleanups to finish.
wg.Wait() wg.Wait()
// Return first error.
for _, err := range errs { for _, err := range errs {
if err != nil { if err != nil {
return err return err
@ -257,49 +298,29 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora
return nil return nil
} }
// Returns if the prefix is a multipart upload. // listMultipartUploadIDs - list all the upload ids from a marker up to 'count'.
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]uploadMetadata, bool, error) {
disk := xl.getRandomDisk() // Choose a random disk.
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
return err == nil
}
// listUploadsInfo - list all uploads info.
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) {
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadIDs, err := getUploadIDs(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {
if err == errFileNotFound {
return []uploadInfo{}, nil
}
return nil, err
}
uploads = uploadIDs.Uploads
return uploads, nil
}
func (xl xlObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int) ([]uploadMetadata, bool, error) {
var uploads []uploadMetadata var uploads []uploadMetadata
uploadsJSONContent, err := getUploadIDs(bucketName, objectName, xl.getRandomDisk()) // Read `uploads.json`.
uploadsJSON, err := readUploadsJSON(bucketName, objectName, disk)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
index := 0 index := 0
if uploadIDMarker != "" { if uploadIDMarker != "" {
for ; index < len(uploadsJSONContent.Uploads); index++ { for ; index < len(uploadsJSON.Uploads); index++ {
if uploadsJSONContent.Uploads[index].UploadID == uploadIDMarker { if uploadsJSON.Uploads[index].UploadID == uploadIDMarker {
// Skip the uploadID as it would already be listed in previous listing. // Skip the uploadID as it would already be listed in previous listing.
index++ index++
break break
} }
} }
} }
for index < len(uploadsJSONContent.Uploads) { for index < len(uploadsJSON.Uploads) {
uploads = append(uploads, uploadMetadata{ uploads = append(uploads, uploadMetadata{
Object: objectName, Object: objectName,
UploadID: uploadsJSONContent.Uploads[index].UploadID, UploadID: uploadsJSON.Uploads[index].UploadID,
Initiated: uploadsJSONContent.Uploads[index].Initiated, Initiated: uploadsJSON.Uploads[index].Initiated,
}) })
count-- count--
index++ index++
@ -307,7 +328,30 @@ func (xl xlObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarke
break break
} }
} }
return uploads, index == len(uploadsJSONContent.Uploads), nil end := (index == len(uploadsJSON.Uploads))
return uploads, end, nil
}
// Returns if the prefix is a multipart upload.
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
disk := xl.getRandomDisk() // Choose a random disk.
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
return err == nil
}
// listUploadsInfo - list all uploads info.
func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) {
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
splitPrefixes := strings.SplitN(prefixPath, "/", 3)
uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk)
if err != nil {
if err == errFileNotFound {
return []uploadInfo{}, nil
}
return nil, err
}
uploadsInfo = uploadsJSON.Uploads
return uploadsInfo, nil
} }
// listMultipartUploadsCommon - lists all multipart uploads, common // listMultipartUploadsCommon - lists all multipart uploads, common
@ -381,7 +425,7 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload
var err error var err error
var eof bool var eof bool
if uploadIDMarker != "" { if uploadIDMarker != "" {
uploads, _, err = xl.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads) uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, xl.getRandomDisk())
if err != nil { if err != nil {
return ListMultipartsInfo{}, err return ListMultipartsInfo{}, err
} }
@ -422,15 +466,15 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload
} }
continue continue
} }
var tmpUploads []uploadMetadata var newUploads []uploadMetadata
var end bool var end bool
uploadIDMarker = "" uploadIDMarker = ""
tmpUploads, end, err = xl.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads) newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, xl.getRandomDisk())
if err != nil { if err != nil {
return ListMultipartsInfo{}, err return ListMultipartsInfo{}, err
} }
uploads = append(uploads, tmpUploads...) uploads = append(uploads, newUploads...)
maxUploads -= len(tmpUploads) maxUploads -= len(newUploads)
if walkResult.end && end { if walkResult.end && end {
eof = true eof = true
break break

@ -416,14 +416,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Validate if there are other incomplete upload-id's present for // Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'. // the object, if yes do not attempt to delete 'uploads.json'.
uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...) uploadsJSON, err := readUploadsJSON(bucket, object, xl.storageDisks...)
if err == nil { if err == nil {
uploadIDIdx := uploadIDs.Index(uploadID) uploadIDIdx := uploadsJSON.Index(uploadID)
if uploadIDIdx != -1 { if uploadIDIdx != -1 {
uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
} }
if len(uploadIDs.Uploads) > 0 { if len(uploadsJSON.Uploads) > 0 {
if err = updateUploadJSON(bucket, object, uploadIDs, xl.storageDisks...); err != nil { if err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...); err != nil {
return "", err return "", err
} }
return s3MD5, nil return s3MD5, nil
@ -461,20 +461,21 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string)
// Cleanup all uploaded parts. // Cleanup all uploaded parts.
if err := cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil { if err := cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil {
return err return toObjectErr(err, bucket, object)
} }
// Validate if there are other incomplete upload-id's present for // Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'. // the object, if yes do not attempt to delete 'uploads.json'.
uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...) uploadsJSON, err := readUploadsJSON(bucket, object, xl.storageDisks...)
if err == nil { if err == nil {
uploadIDIdx := uploadIDs.Index(uploadID) uploadIDIdx := uploadsJSON.Index(uploadID)
if uploadIDIdx != -1 { if uploadIDIdx != -1 {
uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...)
} }
if len(uploadIDs.Uploads) > 0 { if len(uploadsJSON.Uploads) > 0 {
if err = updateUploadJSON(bucket, object, uploadIDs, xl.storageDisks...); err != nil { err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...)
return err if err != nil {
return toObjectErr(err, bucket, object)
} }
return nil return nil
} }

Loading…
Cancel
Save