From b1e2b7dea28058d22389f7ab3eb7cb49c54487d4 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 25 May 2016 21:52:39 +0530 Subject: [PATCH] Fix list-incomplete uploads for XL. --- tree-walk-xl.go | 43 ++----- xl-v1-list-objects.go | 19 ++- xl-v1-multipart-common.go | 240 ++++++++++++++++++++++---------------- 3 files changed, 166 insertions(+), 136 deletions(-) diff --git a/tree-walk-xl.go b/tree-walk-xl.go index ea55c60c3..364e54425 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -18,7 +18,6 @@ package main import ( "math/rand" - "path" "sort" "strings" "time" @@ -34,9 +33,9 @@ type listParams struct { // Tree walk result carries results of tree walking. type treeWalkResult struct { - objInfo ObjectInfo - err error - end bool + entry string + err error + end bool } // Tree walk notify carries a channel which notifies tree walk @@ -48,7 +47,7 @@ type treeWalker struct { } // listDir - listDir. -func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool) (entries []string, err error) { +func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { // Count for list errors encountered. var listErrCount = 0 @@ -62,7 +61,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) entries[i] = "" continue } - if strings.HasSuffix(entry, slashSeparator) && xl.isObject(bucket, path.Join(prefixDir, entry)) { + if strings.HasSuffix(entry, slashSeparator) && isLeaf(bucket, pathJoin(prefixDir, entry)) { entries[i] = strings.TrimSuffix(entry, slashSeparator) } } @@ -90,25 +89,11 @@ func (xl xlObjects) getRandomDisk() (disk StorageAPI) { } // treeWalkXL walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int) bool { +func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int, isLeaf func(string, string) bool) bool { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" - // Convert entry to FileInfo - entryToObjectInfo := func(entry string) (objInfo ObjectInfo, err error) { - if strings.HasSuffix(entry, slashSeparator) { - // Object name needs to be full path. - objInfo.Bucket = bucket - objInfo.Name = path.Join(prefixDir, entry) - objInfo.Name += slashSeparator - objInfo.IsDir = true - return objInfo, nil - } - // Set the Mode to a "regular" file. - return xl.getObjectInfo(bucket, path.Join(prefixDir, entry)) - } - var markerBase, markerDir string if marker != "" { // Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt" @@ -121,7 +106,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin } entries, err := xl.listDir(bucket, prefixDir, func(entry string) bool { return !strings.HasPrefix(entry, entryPrefixMatch) - }) + }, isLeaf) if err != nil { send(treeWalkResult{err: err}) return false @@ -166,19 +151,13 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin } *count-- prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if !xl.treeWalkXL(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count) { + if !xl.treeWalkXL(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) { return false } continue } *count-- - objInfo, err := entryToObjectInfo(entry) - if err != nil { - // The file got deleted in the interim between ListDir() and StatFile() - // Ignore error and continue. - continue - } - if !send(treeWalkResult{objInfo: objInfo}) { + if !send(treeWalkResult{entry: pathJoin(prefixDir, entry)}) { return false } } @@ -186,7 +165,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin } // Initiate a new treeWalk in a goroutine. -func (xl xlObjects) startTreeWalkXL(bucket, prefix, marker string, recursive bool) *treeWalker { +func (xl xlObjects) startTreeWalkXL(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalker { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -223,7 +202,7 @@ func (xl xlObjects) startTreeWalkXL(bucket, prefix, marker string, recursive boo return false } } - xl.treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count) + xl.treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf) }() return &walkNotify } diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index eb446bfd9..ef59cee06 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -11,7 +11,7 @@ func (xl xlObjects) listObjectsXL(bucket, prefix, marker, delimiter string, maxK walker := xl.lookupTreeWalkXL(listParams{bucket, recursive, marker, prefix}) if walker == nil { - walker = xl.startTreeWalkXL(bucket, prefix, marker, recursive) + walker = xl.startTreeWalkXL(bucket, prefix, marker, recursive, xl.isObject) } var objInfos []ObjectInfo var eof bool @@ -31,7 +31,22 @@ func (xl xlObjects) listObjectsXL(bucket, prefix, marker, delimiter string, maxK } return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) } - objInfo := walkResult.objInfo + entry := walkResult.entry + var objInfo ObjectInfo + if strings.HasSuffix(entry, slashSeparator) { + // Object name needs to be full path. + objInfo.Bucket = bucket + objInfo.Name = entry + objInfo.IsDir = true + } else { + // Set the Mode to a "regular" file. + var err error + objInfo, err = xl.getObjectInfo(bucket, entry) + if err != nil { + return ListObjectsInfo{}, toObjectErr(err, bucket, prefix) + } + } + nextMarker = objInfo.Name objInfos = append(objInfos, objInfo) if walkResult.end { diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 8c39fc602..314e3e9e3 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -48,7 +48,7 @@ type byInitiatedTime []uploadInfo func (t byInitiatedTime) Len() int { return len(t) } func (t byInitiatedTime) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t byInitiatedTime) Less(i, j int) bool { - return t[i].Initiated.After(t[j].Initiated) + return t[i].Initiated.Before(t[j].Initiated) } // AddUploadID - adds a new upload id in order of its initiated time. @@ -257,6 +257,49 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora return nil } +// Returns if the prefix is a multipart upload. +func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { + // Create errs and volInfo slices of storageDisks size. + var errs = make([]error, len(xl.storageDisks)) + + // Allocate a new waitgroup. + var wg = &sync.WaitGroup{} + for index, disk := range xl.storageDisks { + wg.Add(1) + // Stat file on all the disks in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + _, err := disk.StatFile(bucket, path.Join(prefix, uploadsJSONFile)) + if err != nil { + errs[index] = err + return + } + errs[index] = nil + }(index, disk) + } + + // Wait for all the Stat operations to finish. + wg.Wait() + + var errFileNotFoundCount int + for _, err := range errs { + if err != nil { + if err == errFileNotFound { + errFileNotFoundCount++ + // If we have errors with file not found greater than allowed read + // quorum we return err as errFileNotFound. + if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + return false + } + continue + } + errorIf(err, "Unable to access file "+path.Join(bucket, prefix)) + return false + } + } + return true +} + // listUploadsInfo - list all uploads info. func (xl xlObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) { disk := xl.getRandomDisk() // Choose a random disk on each attempt. @@ -272,95 +315,37 @@ func (xl xlObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, er return uploads, nil } -// listMetaBucketMultipart - list all objects at a given prefix inside minioMetaBucket. -func (xl xlObjects) listMetaBucketMultipart(prefixPath string, markerPath string, recursive bool, maxKeys int) (objInfos []ObjectInfo, eof bool, err error) { - walker := xl.lookupTreeWalkXL(listParams{minioMetaBucket, recursive, markerPath, prefixPath}) - if walker == nil { - walker = xl.startTreeWalkXL(minioMetaBucket, prefixPath, markerPath, recursive) +func (xl xlObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int) ([]uploadMetadata, bool, error) { + var uploads []uploadMetadata + uploadsJSONContent, err := getUploadIDs(bucketName, objectName, xl.getRandomDisk()) + if err != nil { + return nil, false, err } - - // newMaxKeys tracks the size of entries which are going to be - // returned back. - var newMaxKeys int - - // Following loop gathers and filters out special files inside minio meta volume. - for { - walkResult, ok := <-walker.ch - if !ok { - // Closed channel. - eof = true - break - } - // For any walk error return right away. - if walkResult.err != nil { - // File not found or Disk not found is a valid case. - if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { - return nil, true, nil - } - return nil, false, toObjectErr(walkResult.err, minioMetaBucket, prefixPath) - } - objInfo := walkResult.objInfo - var uploads []uploadInfo - if objInfo.IsDir { - // List all the entries if fi.Name is a leaf directory, if - // fi.Name is not a leaf directory then the resulting - // entries are empty. - uploads, err = xl.listUploadsInfo(objInfo.Name) - if err != nil { - return nil, false, err - } - } - if len(uploads) > 0 { - for _, upload := range uploads { - objInfos = append(objInfos, ObjectInfo{ - Name: path.Join(objInfo.Name, upload.UploadID), - ModTime: upload.Initiated, - }) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. - if newMaxKeys == maxKeys { - break - } - } - } else { - // We reach here for a non-recursive case non-leaf entry - // OR recursive case with fi.Name. - if !objInfo.IsDir { // Do not skip non-recursive case directory entries. - // Validate if 'fi.Name' is incomplete multipart. - if !strings.HasSuffix(objInfo.Name, xlMetaJSONFile) { - continue - } - objInfo.Name = path.Dir(objInfo.Name) - } - objInfos = append(objInfos, objInfo) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. - if newMaxKeys == maxKeys { + index := 0 + if uploadIDMarker != "" { + for ; index < len(uploadsJSONContent.Uploads); index++ { + if uploadsJSONContent.Uploads[index].UploadID == uploadIDMarker { + // Skip the uploadID as it would already be listed in previous listing. + index++ break } } } - - if !eof && len(objInfos) != 0 { - // EOF has not reached, hence save the walker channel to the map so that the walker go routine - // can continue from where it left off for the next list request. - lastObjInfo := objInfos[len(objInfos)-1] - markerPath = lastObjInfo.Name - xl.saveTreeWalkXL(listParams{minioMetaBucket, recursive, markerPath, prefixPath}, walker) + for index < len(uploadsJSONContent.Uploads) { + uploads = append(uploads, uploadMetadata{ + Object: objectName, + UploadID: uploadsJSONContent.Uploads[index].UploadID, + Initiated: uploadsJSONContent.Uploads[index].Initiated, + }) + count-- + index++ + if count == 0 { + break + } } - - // Return entries here. - return objInfos, eof, nil + return uploads, index == len(uploadsJSONContent.Uploads), nil } -// FIXME: Currently the code sorts based on keyName/upload-id which is -// not correct based on the S3 specs. According to s3 specs we are -// supposed to only lexically sort keyNames and then for keyNames with -// multiple upload ids should be sorted based on the initiated time. -// Currently this case is not handled. - // listMultipartUploadsCommon - lists all multipart uploads, common // function for both object layers. func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { @@ -413,9 +398,12 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload result.IsTruncated = true result.MaxUploads = maxUploads + result.KeyMarker = keyMarker + result.Prefix = prefix + result.Delimiter = delimiter // Not using path.Join() as it strips off the trailing '/'. - multipartPrefixPath := pathJoin(mpartMetaPrefix, pathJoin(bucket, prefix)) + multipartPrefixPath := pathJoin(mpartMetaPrefix, bucket, prefix) if prefix == "" { // Should have a trailing "/" if prefix is "" // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" @@ -423,33 +411,81 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload } multipartMarkerPath := "" if keyMarker != "" { - keyMarkerPath := pathJoin(pathJoin(bucket, keyMarker), uploadIDMarker) - multipartMarkerPath = pathJoin(mpartMetaPrefix, keyMarkerPath) + multipartMarkerPath = pathJoin(mpartMetaPrefix, bucket, keyMarker) } - - // List all the multipart files at prefixPath, starting with marker keyMarkerPath. - objInfos, eof, err := xl.listMetaBucketMultipart(multipartPrefixPath, multipartMarkerPath, recursive, maxUploads) - if err != nil { - return ListMultipartsInfo{}, err + var uploads []uploadMetadata + var err error + var eof bool + if uploadIDMarker != "" { + uploads, _, err = xl.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads) + if err != nil { + return ListMultipartsInfo{}, err + } + maxUploads = maxUploads - len(uploads) } - - // Loop through all the received files fill in the multiparts result. - for _, objInfo := range objInfos { + if maxUploads > 0 { + walker := xl.lookupTreeWalkXL(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + if walker == nil { + walker = xl.startTreeWalkXL(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload) + } + for maxUploads > 0 { + walkResult, ok := <-walker.ch + if !ok { + // Closed channel. + eof = true + break + } + // For any walk error return right away. + if walkResult.err != nil { + // File not found or Disk not found is a valid case. + if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { + eof = true + break + } + return ListMultipartsInfo{}, err + } + entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket))) + if strings.HasSuffix(walkResult.entry, slashSeparator) { + uploads = append(uploads, uploadMetadata{ + Object: entry, + }) + maxUploads-- + if maxUploads == 0 { + if walkResult.end { + eof = true + break + } + } + continue + } + var tmpUploads []uploadMetadata + var end bool + uploadIDMarker = "" + tmpUploads, end, err = xl.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads) + if err != nil { + return ListMultipartsInfo{}, err + } + uploads = append(uploads, tmpUploads...) + maxUploads -= len(tmpUploads) + if walkResult.end && end { + eof = true + break + } + } + } + // Loop through all the received uploads fill in the multiparts result. + for _, upload := range uploads { var objectName string var uploadID string - if objInfo.IsDir { + if strings.HasSuffix(upload.Object, slashSeparator) { // All directory entries are common prefixes. uploadID = "" // Upload ids are empty for CommonPrefixes. - objectName = strings.TrimPrefix(objInfo.Name, retainSlash(pathJoin(mpartMetaPrefix, bucket))) + objectName = upload.Object result.CommonPrefixes = append(result.CommonPrefixes, objectName) } else { - uploadID = path.Base(objInfo.Name) - objectName = strings.TrimPrefix(path.Dir(objInfo.Name), retainSlash(pathJoin(mpartMetaPrefix, bucket))) - result.Uploads = append(result.Uploads, uploadMetadata{ - Object: objectName, - UploadID: uploadID, - Initiated: objInfo.ModTime, - }) + uploadID = upload.UploadID + objectName = upload.Object + result.Uploads = append(result.Uploads, upload) } result.NextKeyMarker = objectName result.NextUploadIDMarker = uploadID