Move the files and rename some functions.

- Rename dir.go as 'fs-multipart-dir.go'
- Move the push/pop to fs-multipart.go and rename them as save/lookup.
- Rename objectInfo instances in fs-multipart as multipartObjInfo.
master
Harshavardhana 9 years ago
parent 9632c94e7a
commit 3fcc60de91
  1. 18
      bucket-handlers.go
  2. 3
      fs-multipart-dir.go
  3. 74
      fs-multipart.go
  4. 37
      fs.go

@ -179,8 +179,22 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r
writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path) writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path)
return return
} }
if maxUploads == 0 { if keyMarker != "" {
maxUploads = maxObjectList // Unescape keyMarker string
keyMarkerUnescaped, e := url.QueryUnescape(keyMarker)
if e != nil {
if e != nil {
// Return 'NoSuchKey' to indicate invalid marker key.
writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path)
return
}
keyMarker = keyMarkerUnescaped
// Marker not common with prefix is not implemented.
if !strings.HasPrefix(keyMarker, prefix) {
writeErrorResponse(w, r, ErrNotImplemented, r.URL.Path)
return
}
}
} }
listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)

@ -135,7 +135,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
timeoutCh := make(chan struct{}, 1) timeoutCh := make(chan struct{}, 1)
// TODO: check if bucketDir is absolute path // TODO: check if bucketDir is absolute path
scanDir := bucketDir scanDir := bucketDir
dirDepth := bucketDir dirDepth := bucketDir
@ -145,7 +144,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
if strings.HasSuffix(prefixPath, string(os.PathSeparator)) { if strings.HasSuffix(prefixPath, string(os.PathSeparator)) {
tmpPrefixPath += string(os.PathSeparator) tmpPrefixPath += string(os.PathSeparator)
} }
prefixPath = tmpPrefixPath prefixPath = tmpPrefixPath
} }
@ -375,7 +373,6 @@ func (oic multipartObjectInfoChannel) IsClosed() bool {
if oic.objInfo != nil { if oic.objInfo != nil {
return false return false
} }
return oic.closed return oic.closed
} }

@ -23,7 +23,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -421,6 +420,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa
return newObject, nil return newObject, nil
} }
func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) {
fs.listMultipartObjectMapMutex.Lock()
defer fs.listMultipartObjectMapMutex.Unlock()
channels := []multipartObjectInfoChannel{ch}
if _, ok := fs.listMultipartObjectMap[params]; ok {
channels = append(fs.listMultipartObjectMap[params], ch)
}
fs.listMultipartObjectMap[params] = channels
}
func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel {
fs.listMultipartObjectMapMutex.Lock()
defer fs.listMultipartObjectMapMutex.Unlock()
if channels, ok := fs.listMultipartObjectMap[params]; ok {
for i, channel := range channels {
if !channel.IsTimedOut() {
chs := channels[i+1:]
if len(chs) > 0 {
fs.listMultipartObjectMap[params] = chs
} else {
delete(fs.listMultipartObjectMap, params)
}
return &channel
}
}
// As all channels are timed out, delete the map entry
delete(fs.listMultipartObjectMap, params)
}
return nil
}
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) { func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) {
result := ListMultipartsInfo{} result := ListMultipartsInfo{}
@ -442,13 +477,6 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa
return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter)) return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter))
} }
// Unescape keyMarker string
if tmpKeyMarker, err := url.QueryUnescape(keyMarker); err == nil {
keyMarker = tmpKeyMarker
} else {
return result, probe.NewError(err)
}
if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) { if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) {
return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix)) return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix))
} }
@ -490,48 +518,48 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa
bucketDir := filepath.Join(fs.path, bucket) bucketDir := filepath.Join(fs.path, bucket)
// If listMultipartObjectChannel is available for given parameters, then use it, else create new one // If listMultipartObjectChannel is available for given parameters, then use it, else create new one
objectInfoCh := fs.popListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker})
if objectInfoCh == nil { if multipartObjectInfoCh == nil {
ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive)
objectInfoCh = &ch multipartObjectInfoCh = &ch
} }
nextKeyMarker := "" nextKeyMarker := ""
nextUploadIDMarker := "" nextUploadIDMarker := ""
for i := 0; i < maxUploads; { for i := 0; i < maxUploads; {
objInfo, ok := objectInfoCh.Read() multipartObjInfo, ok := multipartObjectInfoCh.Read()
if !ok { if !ok {
// Closed channel. // Closed channel.
return result, nil return result, nil
} }
if objInfo.Err != nil { if multipartObjInfo.Err != nil {
return ListMultipartsInfo{}, probe.NewError(objInfo.Err) return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err)
} }
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { if strings.Contains(multipartObjInfo.Name, "$multiparts") || strings.Contains(multipartObjInfo.Name, "$tmpobject") {
continue continue
} }
if objInfo.IsDir && skipDir { if multipartObjInfo.IsDir && skipDir {
continue continue
} }
if objInfo.IsDir { if multipartObjInfo.IsDir {
result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name) result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name)
} else { } else {
result.Uploads = append(result.Uploads, uploadMetadata{Object: objInfo.Name, UploadID: objInfo.UploadID, Initiated: objInfo.ModifiedTime}) result.Uploads = append(result.Uploads, uploadMetadata{Object: multipartObjInfo.Name, UploadID: multipartObjInfo.UploadID, Initiated: multipartObjInfo.ModifiedTime})
} }
nextKeyMarker = objInfo.Name nextKeyMarker = multipartObjInfo.Name
nextUploadIDMarker = objInfo.UploadID nextUploadIDMarker = multipartObjInfo.UploadID
i++ i++
} }
if !objectInfoCh.IsClosed() { if !multipartObjectInfoCh.IsClosed() {
result.IsTruncated = true result.IsTruncated = true
result.NextKeyMarker = nextKeyMarker result.NextKeyMarker = nextKeyMarker
result.NextUploadIDMarker = nextUploadIDMarker result.NextUploadIDMarker = nextUploadIDMarker
fs.pushListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *objectInfoCh) fs.saveListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *multipartObjectInfoCh)
} }
return result, nil return result, nil

37
fs.go

@ -50,43 +50,6 @@ type Filesystem struct {
listMultipartObjectMapMutex *sync.Mutex listMultipartObjectMapMutex *sync.Mutex
} }
func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) {
fs.listMultipartObjectMapMutex.Lock()
defer fs.listMultipartObjectMapMutex.Unlock()
channels := []multipartObjectInfoChannel{ch}
if _, ok := fs.listMultipartObjectMap[params]; ok {
channels = append(fs.listMultipartObjectMap[params], ch)
}
fs.listMultipartObjectMap[params] = channels
}
func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel {
fs.listMultipartObjectMapMutex.Lock()
defer fs.listMultipartObjectMapMutex.Unlock()
if channels, ok := fs.listMultipartObjectMap[params]; ok {
for i, channel := range channels {
if !channel.IsTimedOut() {
chs := channels[i+1:]
if len(chs) > 0 {
fs.listMultipartObjectMap[params] = chs
} else {
delete(fs.listMultipartObjectMap, params)
}
return &channel
}
}
// As all channels are timed out, delete the map entry
delete(fs.listMultipartObjectMap, params)
}
return nil
}
// newFS instantiate a new filesystem. // newFS instantiate a new filesystem.
func newFS(rootPath string) (ObjectAPI, *probe.Error) { func newFS(rootPath string) (ObjectAPI, *probe.Error) {
fs := &Filesystem{ fs := &Filesystem{

Loading…
Cancel
Save