diff --git a/bucket-handlers.go b/bucket-handlers.go index 644be8c90..66967ddae 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -179,8 +179,22 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path) return } - if maxUploads == 0 { - maxUploads = maxObjectList + if keyMarker != "" { + // Unescape keyMarker string + keyMarkerUnescaped, e := url.QueryUnescape(keyMarker) + if e != nil { + if e != nil { + // Return 'NoSuchKey' to indicate invalid marker key. + writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path) + return + } + keyMarker = keyMarkerUnescaped + // Marker not common with prefix is not implemented. + if !strings.HasPrefix(keyMarker, prefix) { + writeErrorResponse(w, r, ErrNotImplemented, r.URL.Path) + return + } + } } listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) diff --git a/dir.go b/fs-multipart-dir.go similarity index 99% rename from dir.go rename to fs-multipart-dir.go index 3f8239193..36a953fe2 100644 --- a/dir.go +++ b/fs-multipart-dir.go @@ -135,7 +135,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, timeoutCh := make(chan struct{}, 1) // TODO: check if bucketDir is absolute path - scanDir := bucketDir dirDepth := bucketDir @@ -145,7 +144,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, if strings.HasSuffix(prefixPath, string(os.PathSeparator)) { tmpPrefixPath += string(os.PathSeparator) } - prefixPath = tmpPrefixPath } @@ -375,7 +373,6 @@ func (oic multipartObjectInfoChannel) IsClosed() bool { if oic.objInfo != nil { return false } - return oic.closed } diff --git a/fs-multipart.go b/fs-multipart.go index 071cc270e..f70f5625a 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "io/ioutil" - "net/url" "os" "path/filepath" "strconv" @@ -421,6 +420,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa return newObject, nil } +func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + channels := []multipartObjectInfoChannel{ch} + if _, ok := fs.listMultipartObjectMap[params]; ok { + channels = append(fs.listMultipartObjectMap[params], ch) + } + + fs.listMultipartObjectMap[params] = channels +} + +func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + if channels, ok := fs.listMultipartObjectMap[params]; ok { + for i, channel := range channels { + if !channel.IsTimedOut() { + chs := channels[i+1:] + if len(chs) > 0 { + fs.listMultipartObjectMap[params] = chs + } else { + delete(fs.listMultipartObjectMap, params) + } + + return &channel + } + } + + // As all channels are timed out, delete the map entry + delete(fs.listMultipartObjectMap, params) + } + return nil +} + // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) { result := ListMultipartsInfo{} @@ -442,13 +477,6 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter)) } - // Unescape keyMarker string - if tmpKeyMarker, err := url.QueryUnescape(keyMarker); err == nil { - keyMarker = tmpKeyMarker - } else { - return result, probe.NewError(err) - } - if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) { return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix)) } @@ -490,48 +518,48 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa bucketDir := filepath.Join(fs.path, bucket) // If listMultipartObjectChannel is available for given parameters, then use it, else create new one - objectInfoCh := fs.popListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) - if objectInfoCh == nil { + multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) + if multipartObjectInfoCh == nil { ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) - objectInfoCh = &ch + multipartObjectInfoCh = &ch } nextKeyMarker := "" nextUploadIDMarker := "" for i := 0; i < maxUploads; { - objInfo, ok := objectInfoCh.Read() + multipartObjInfo, ok := multipartObjectInfoCh.Read() if !ok { // Closed channel. return result, nil } - if objInfo.Err != nil { - return ListMultipartsInfo{}, probe.NewError(objInfo.Err) + if multipartObjInfo.Err != nil { + return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) } - if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { + if strings.Contains(multipartObjInfo.Name, "$multiparts") || strings.Contains(multipartObjInfo.Name, "$tmpobject") { continue } - if objInfo.IsDir && skipDir { + if multipartObjInfo.IsDir && skipDir { continue } - if objInfo.IsDir { - result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name) + if multipartObjInfo.IsDir { + result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) } else { - result.Uploads = append(result.Uploads, uploadMetadata{Object: objInfo.Name, UploadID: objInfo.UploadID, Initiated: objInfo.ModifiedTime}) + result.Uploads = append(result.Uploads, uploadMetadata{Object: multipartObjInfo.Name, UploadID: multipartObjInfo.UploadID, Initiated: multipartObjInfo.ModifiedTime}) } - nextKeyMarker = objInfo.Name - nextUploadIDMarker = objInfo.UploadID + nextKeyMarker = multipartObjInfo.Name + nextUploadIDMarker = multipartObjInfo.UploadID i++ } - if !objectInfoCh.IsClosed() { + if !multipartObjectInfoCh.IsClosed() { result.IsTruncated = true result.NextKeyMarker = nextKeyMarker result.NextUploadIDMarker = nextUploadIDMarker - fs.pushListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *objectInfoCh) + fs.saveListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *multipartObjectInfoCh) } return result, nil diff --git a/fs.go b/fs.go index 386725e4a..2f639eb94 100644 --- a/fs.go +++ b/fs.go @@ -50,43 +50,6 @@ type Filesystem struct { listMultipartObjectMapMutex *sync.Mutex } -func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { - fs.listMultipartObjectMapMutex.Lock() - defer fs.listMultipartObjectMapMutex.Unlock() - - channels := []multipartObjectInfoChannel{ch} - if _, ok := fs.listMultipartObjectMap[params]; ok { - channels = append(fs.listMultipartObjectMap[params], ch) - } - - fs.listMultipartObjectMap[params] = channels -} - -func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { - fs.listMultipartObjectMapMutex.Lock() - defer fs.listMultipartObjectMapMutex.Unlock() - - if channels, ok := fs.listMultipartObjectMap[params]; ok { - for i, channel := range channels { - if !channel.IsTimedOut() { - chs := channels[i+1:] - if len(chs) > 0 { - fs.listMultipartObjectMap[params] = chs - } else { - delete(fs.listMultipartObjectMap, params) - } - - return &channel - } - } - - // As all channels are timed out, delete the map entry - delete(fs.listMultipartObjectMap, params) - } - - return nil -} - // newFS instantiate a new filesystem. func newFS(rootPath string) (ObjectAPI, *probe.Error) { fs := &Filesystem{