From 9632c94e7aa8d1e9514003b83c17ca83ac3d85f2 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Apr 2016 11:24:23 -0700 Subject: [PATCH 1/3] Fix list objects test and remove all the old unnecessary files. - Fix tests for new changes. - Change Golang err as 'e' for the time being, before we bring in probe removal change. - Remove old structs and temporary files. --- fs-backend-metadata.go | 56 ------------ fs-bucket-listobjects.go | 15 ++-- fs-multipart.go | 190 +++++++++++++++++++-------------------- fs.go | 40 --------- fs_api_suite_test.go | 2 +- 5 files changed, 101 insertions(+), 202 deletions(-) delete mode 100644 fs-backend-metadata.go diff --git a/fs-backend-metadata.go b/fs-backend-metadata.go deleted file mode 100644 index 100ef3df9..000000000 --- a/fs-backend-metadata.go +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2015, 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "github.com/minio/minio/pkg/probe" - "github.com/minio/minio/pkg/quick" -) - -var multipartsMetadataPath string - -// SetFSMultipartsMetadataPath - set custom multiparts session metadata path. -func setFSMultipartsMetadataPath(metadataPath string) { - multipartsMetadataPath = metadataPath -} - -// saveMultipartsSession - save multiparts. -func saveMultipartsSession(mparts multiparts) *probe.Error { - qc, err := quick.New(mparts) - if err != nil { - return err.Trace() - } - if err := qc.Save(multipartsMetadataPath); err != nil { - return err.Trace() - } - return nil -} - -// loadMultipartsSession load multipart session file. -func loadMultipartsSession() (*multiparts, *probe.Error) { - mparts := &multiparts{} - mparts.Version = "1" - mparts.ActiveSession = make(map[string]*multipartSession) - qc, err := quick.New(mparts) - if err != nil { - return nil, err.Trace() - } - if err := qc.Load(multipartsMetadataPath); err != nil { - return nil, err.Trace() - } - return qc.Data().(*multiparts), nil -} diff --git a/fs-bucket-listobjects.go b/fs-bucket-listobjects.go index 10375e57f..6e55f3b8b 100644 --- a/fs-bucket-listobjects.go +++ b/fs-bucket-listobjects.go @@ -32,15 +32,13 @@ const ( // isDirExist - returns whether given directory is exist or not. func isDirExist(dirname string) (bool, error) { - fi, err := os.Lstat(dirname) - if err != nil { - if os.IsNotExist(err) { + fi, e := os.Lstat(dirname) + if e != nil { + if os.IsNotExist(e) { return false, nil } - - return false, err + return false, e } - return fi.IsDir(), nil } @@ -129,9 +127,8 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe // Verify if prefix exists. prefixDir := filepath.Dir(filepath.FromSlash(prefix)) rootDir := filepath.Join(bucketDir, prefixDir) - _, e := isDirExist(rootDir) - if e != nil { - if os.IsNotExist(e) { + if status, e := isDirExist(rootDir); !status { + if e == nil { // Prefix does not exist, not an error just respond empty // list response. return result, nil diff --git a/fs-multipart.go b/fs-multipart.go index 6a8396e56..071cc270e 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -39,19 +39,19 @@ const configDir = ".minio" const uploadIDSuffix = ".uploadid" func removeFileTree(fileName string, level string) error { - if err := os.Remove(fileName); err != nil { - return err + if e := os.Remove(fileName); e != nil { + return e } for fileDir := filepath.Dir(fileName); fileDir > level; fileDir = filepath.Dir(fileDir) { - if status, err := isDirEmpty(fileDir); err != nil { - return err + if status, e := isDirEmpty(fileDir); e != nil { + return e } else if !status { break } - if err := os.Remove(fileDir); err != nil { - return err + if e := os.Remove(fileDir); e != nil { + return e } } @@ -59,17 +59,17 @@ func removeFileTree(fileName string, level string) error { } func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error { - tempFile, err := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") - if err != nil { - return err + tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") + if e != nil { + return e } md5Hasher := md5.New() multiWriter := io.MultiWriter(md5Hasher, tempFile) - if _, err := io.CopyN(multiWriter, data, size); err != nil { + if _, e := io.CopyN(multiWriter, data, size); e != nil { tempFile.Close() os.Remove(tempFile.Name()) - return err + return e } tempFile.Close() @@ -79,22 +79,22 @@ func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum} } - if err := os.Rename(tempFile.Name(), fileName); err != nil { + if e := os.Rename(tempFile.Name(), fileName); e != nil { os.Remove(tempFile.Name()) - return err + return e } return nil } func isFileExist(filename string) (bool, error) { - fi, err := os.Lstat(filename) - if err != nil { - if os.IsNotExist(err) { + fi, e := os.Lstat(filename) + if e != nil { + if os.IsNotExist(e) { return false, nil } - return false, err + return false, e } return fi.Mode().IsRegular(), nil @@ -120,30 +120,30 @@ func (fs Filesystem) newUploadID(bucket, object string) (string, error) { metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) // create metaObjectDir if not exist - if status, err := isDirExist(metaObjectDir); err != nil { - return "", err + if status, e := isDirExist(metaObjectDir); e != nil { + return "", e } else if !status { - if err := os.MkdirAll(metaObjectDir, 0755); err != nil { - return "", err + if e := os.MkdirAll(metaObjectDir, 0755); e != nil { + return "", e } } for { - uuid, err := uuid.New() - if err != nil { - return "", err + uuid, e := uuid.New() + if e != nil { + return "", e } uploadID := uuid.String() uploadIDFile := filepath.Join(metaObjectDir, uploadID+uploadIDSuffix) - if _, err := os.Lstat(uploadIDFile); err != nil { - if !os.IsNotExist(err) { - return "", err + if _, e := os.Lstat(uploadIDFile); e != nil { + if !os.IsNotExist(e) { + return "", e } // uploadIDFile doesn't exist, so create empty file to reserve the name - if err := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); err != nil { - return "", err + if e := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); e != nil { + return "", e } return uploadID, nil @@ -161,32 +161,32 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error { metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) uploadIDPrefix := uploadID + "." - names, err := filteredReaddirnames(metaObjectDir, + names, e := filteredReaddirnames(metaObjectDir, func(name string) bool { return strings.HasPrefix(name, uploadIDPrefix) }, ) - if err != nil { - return err + if e != nil { + return e } for _, name := range names { - if err := os.Remove(filepath.Join(metaObjectDir, name)); err != nil { + if e := os.Remove(filepath.Join(metaObjectDir, name)); e != nil { //return InternalError{Err: err} - return err + return e } } - if status, err := isDirEmpty(metaObjectDir); err != nil { + if status, e := isDirEmpty(metaObjectDir); e != nil { // TODO: add log than returning error //return InternalError{Err: err} - return err + return e } else if status { - if err := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); err != nil { + if e := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); e != nil { // TODO: add log than returning error //return InternalError{Err: err} - return err + return e } } @@ -199,9 +199,9 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) { } bucket = getActualBucketname(fs.path, bucket) - if status, err := isDirExist(filepath.Join(fs.path, bucket)); err != nil { + if status, e := isDirExist(filepath.Join(fs.path, bucket)); e != nil { //return "", InternalError{Err: err} - return "", err + return "", e } else if !status { return "", BucketNotFound{Bucket: bucket} } @@ -210,13 +210,12 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) { } func (fs Filesystem) checkDiskFree() error { - di, err := disk.GetInfo(fs.path) - if err != nil { - return err + di, e := disk.GetInfo(fs.path) + if e != nil { + return e } - // Remove 5% from total space for cumulative disk space used for - // journalling, inodes etc. + // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 if int64(availableDiskSpace) <= fs.minFreeDisk { return RootPathFull{Path: fs.path} @@ -226,9 +225,9 @@ func (fs Filesystem) checkDiskFree() error { } func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) { - bucket, err := fs.checkBucketArg(bucket) - if err != nil { - return "", err + bucket, e := fs.checkBucketArg(bucket) + if e != nil { + return "", e } if !IsValidObjectName(object) { @@ -240,19 +239,19 @@ func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) { // NewMultipartUpload - initiate a new multipart session func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return "", probe.NewError(err) + return "", probe.NewError(e) } - if err := fs.checkDiskFree(); err != nil { - return "", probe.NewError(err) + if e := fs.checkDiskFree(); e != nil { + return "", probe.NewError(e) } - uploadID, err := fs.newUploadID(bucket, object) - if err != nil { - return "", probe.NewError(err) + uploadID, e := fs.newUploadID(bucket, object) + if e != nil { + return "", probe.NewError(e) } return uploadID, nil @@ -260,15 +259,15 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E // PutObjectPart - create a part in a multipart session func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return "", probe.NewError(err) + return "", probe.NewError(e) } - if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil { //return "", probe.NewError(InternalError{Err: err}) - return "", probe.NewError(err) + return "", probe.NewError(e) } else if !status { return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } @@ -282,13 +281,13 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i return "", probe.NewError(errors.New("invalid part id, should be not more than 10000")) } - if err := fs.checkDiskFree(); err != nil { - return "", probe.NewError(err) + if e := fs.checkDiskFree(); e != nil { + return "", probe.NewError(e) } partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex) - if err := safeWrite(partFile, data, size, md5Hex); err != nil { - return "", probe.NewError(err) + if e := safeWrite(partFile, data, size, md5Hex); e != nil { + return "", probe.NewError(e) } return md5Hex, nil @@ -296,21 +295,21 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i // AbortMultipartUpload - abort an incomplete multipart session func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return probe.NewError(err) + return probe.NewError(e) } - if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil { //return probe.NewError(InternalError{Err: err}) - return probe.NewError(err) + return probe.NewError(e) } else if !status { return probe.NewError(InvalidUploadID{UploadID: uploadID}) } - if err := fs.cleanupUploadID(bucket, object, uploadID); err != nil { - return probe.NewError(err) + if e := fs.cleanupUploadID(bucket, object, uploadID); e != nil { + return probe.NewError(e) } return nil @@ -318,49 +317,48 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob // CompleteMultipartUpload - complete a multipart upload and persist the data func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } - if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil { //return probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } else if !status { return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) } - if err := fs.checkDiskFree(); err != nil { - return ObjectInfo{}, probe.NewError(err) + if e := fs.checkDiskFree(); e != nil { + return ObjectInfo{}, probe.NewError(e) } metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) - var md5sums []string + var md5Sums []string for _, part := range parts { partNumber := part.PartNumber md5sum := strings.Trim(part.ETag, "\"") partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) if status, err := isFileExist(partFile); err != nil { - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) return ObjectInfo{}, probe.NewError(err) } else if !status { return ObjectInfo{}, probe.NewError(InvalidPart{}) } - md5sums = append(md5sums, md5sum) + md5Sums = append(md5Sums, md5sum) } // Save the s3 md5. - s3MD5, perr := makeS3MD5(md5sums...) - if perr != nil { - return ObjectInfo{}, perr + s3MD5, err := makeS3MD5(md5Sums...) + if err != nil { + return ObjectInfo{}, err.Trace(md5Sums...) } - tempFile, err := ioutil.TempFile(metaObjectDir, uploadID+".complete.") - if err != nil { + tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.") + if e != nil { //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } for _, part := range parts { @@ -368,38 +366,38 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa md5sum := strings.Trim(part.ETag, "\"") partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) var f *os.File - f, err = os.Open(partFile) - if err != nil { + f, e = os.Open(partFile) + if e != nil { tempFile.Close() os.Remove(tempFile.Name()) //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) - } else if _, err = io.Copy(tempFile, f); err != nil { + return ObjectInfo{}, probe.NewError(e) + } else if _, e = io.Copy(tempFile, f); e != nil { tempFile.Close() os.Remove(tempFile.Name()) //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } f.Close() } tempFile.Close() // fi is used later - fi, err := os.Stat(tempFile.Name()) - if err != nil { + fi, e := os.Stat(tempFile.Name()) + if e != nil { os.Remove(tempFile.Name()) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } bucketPath := filepath.Join(fs.path, bucket) objectPath := filepath.Join(bucketPath, object) - if err = os.MkdirAll(filepath.Dir(objectPath), 0755); err != nil { + if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil { os.Remove(tempFile.Name()) //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } - if err = os.Rename(tempFile.Name(), objectPath); err != nil { + if e = os.Rename(tempFile.Name(), objectPath); e != nil { os.Remove(tempFile.Name()) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } fs.cleanupUploadID(bucket, object, uploadID) // TODO: handle and log the error diff --git a/fs.go b/fs.go index d2c9b519c..386725e4a 100644 --- a/fs.go +++ b/fs.go @@ -17,10 +17,7 @@ package main import ( - "os" - "path/filepath" "sync" - "time" "github.com/minio/minio/pkg/probe" ) @@ -47,28 +44,12 @@ type Filesystem struct { path string minFreeDisk int64 rwLock *sync.RWMutex - multiparts *multiparts listObjectMap map[listObjectParams][]*treeWalker listObjectMapMutex *sync.Mutex listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel listMultipartObjectMapMutex *sync.Mutex } -// MultipartSession holds active session information -type multipartSession struct { - TotalParts int - ObjectName string - UploadID string - Initiated time.Time - Parts []partInfo -} - -// multiparts collection of many parts -type multiparts struct { - Version string `json:"version"` - ActiveSession map[string]*multipartSession `json:"activeSessions"` -} - func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { fs.listMultipartObjectMapMutex.Lock() defer fs.listMultipartObjectMapMutex.Unlock() @@ -108,31 +89,10 @@ func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) // newFS instantiate a new filesystem. func newFS(rootPath string) (ObjectAPI, *probe.Error) { - setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json")) - - var err *probe.Error - // load multiparts session from disk - var mparts *multiparts - mparts, err = loadMultipartsSession() - if err != nil { - if os.IsNotExist(err.ToGoError()) { - mparts = &multiparts{ - Version: "1", - ActiveSession: make(map[string]*multipartSession), - } - if err = saveMultipartsSession(*mparts); err != nil { - return nil, err.Trace() - } - } else { - return nil, err.Trace() - } - } - fs := &Filesystem{ rwLock: &sync.RWMutex{}, } fs.path = rootPath - fs.multiparts = mparts /// Defaults diff --git a/fs_api_suite_test.go b/fs_api_suite_test.go index 87b53426c..804a64db7 100644 --- a/fs_api_suite_test.go +++ b/fs_api_suite_test.go @@ -79,7 +79,7 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) { } objInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts) c.Assert(err, check.IsNil) - c.Assert(objInfo.MD5Sum, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10") + c.Assert(objInfo.MD5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10") } func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) { From 3fcc60de91918b5942586cf53b95b73662324fac Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Apr 2016 12:26:17 -0700 Subject: [PATCH 2/3] Move the files and rename some functions. - Rename dir.go as 'fs-multipart-dir.go' - Move the push/pop to fs-multipart.go and rename them as save/lookup. - Rename objectInfo instances in fs-multipart as multipartObjInfo. --- bucket-handlers.go | 18 ++++++++- dir.go => fs-multipart-dir.go | 3 -- fs-multipart.go | 74 ++++++++++++++++++++++++----------- fs.go | 37 ------------------ 4 files changed, 67 insertions(+), 65 deletions(-) rename dir.go => fs-multipart-dir.go (99%) diff --git a/bucket-handlers.go b/bucket-handlers.go index 644be8c90..66967ddae 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -179,8 +179,22 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path) return } - if maxUploads == 0 { - maxUploads = maxObjectList + if keyMarker != "" { + // Unescape keyMarker string + keyMarkerUnescaped, e := url.QueryUnescape(keyMarker) + if e != nil { + if e != nil { + // Return 'NoSuchKey' to indicate invalid marker key. + writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path) + return + } + keyMarker = keyMarkerUnescaped + // Marker not common with prefix is not implemented. + if !strings.HasPrefix(keyMarker, prefix) { + writeErrorResponse(w, r, ErrNotImplemented, r.URL.Path) + return + } + } } listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) diff --git a/dir.go b/fs-multipart-dir.go similarity index 99% rename from dir.go rename to fs-multipart-dir.go index 3f8239193..36a953fe2 100644 --- a/dir.go +++ b/fs-multipart-dir.go @@ -135,7 +135,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, timeoutCh := make(chan struct{}, 1) // TODO: check if bucketDir is absolute path - scanDir := bucketDir dirDepth := bucketDir @@ -145,7 +144,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, if strings.HasSuffix(prefixPath, string(os.PathSeparator)) { tmpPrefixPath += string(os.PathSeparator) } - prefixPath = tmpPrefixPath } @@ -375,7 +373,6 @@ func (oic multipartObjectInfoChannel) IsClosed() bool { if oic.objInfo != nil { return false } - return oic.closed } diff --git a/fs-multipart.go b/fs-multipart.go index 071cc270e..f70f5625a 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "io/ioutil" - "net/url" "os" "path/filepath" "strconv" @@ -421,6 +420,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa return newObject, nil } +func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + channels := []multipartObjectInfoChannel{ch} + if _, ok := fs.listMultipartObjectMap[params]; ok { + channels = append(fs.listMultipartObjectMap[params], ch) + } + + fs.listMultipartObjectMap[params] = channels +} + +func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + if channels, ok := fs.listMultipartObjectMap[params]; ok { + for i, channel := range channels { + if !channel.IsTimedOut() { + chs := channels[i+1:] + if len(chs) > 0 { + fs.listMultipartObjectMap[params] = chs + } else { + delete(fs.listMultipartObjectMap, params) + } + + return &channel + } + } + + // As all channels are timed out, delete the map entry + delete(fs.listMultipartObjectMap, params) + } + return nil +} + // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) { result := ListMultipartsInfo{} @@ -442,13 +477,6 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter)) } - // Unescape keyMarker string - if tmpKeyMarker, err := url.QueryUnescape(keyMarker); err == nil { - keyMarker = tmpKeyMarker - } else { - return result, probe.NewError(err) - } - if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) { return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix)) } @@ -490,48 +518,48 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa bucketDir := filepath.Join(fs.path, bucket) // If listMultipartObjectChannel is available for given parameters, then use it, else create new one - objectInfoCh := fs.popListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) - if objectInfoCh == nil { + multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) + if multipartObjectInfoCh == nil { ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) - objectInfoCh = &ch + multipartObjectInfoCh = &ch } nextKeyMarker := "" nextUploadIDMarker := "" for i := 0; i < maxUploads; { - objInfo, ok := objectInfoCh.Read() + multipartObjInfo, ok := multipartObjectInfoCh.Read() if !ok { // Closed channel. return result, nil } - if objInfo.Err != nil { - return ListMultipartsInfo{}, probe.NewError(objInfo.Err) + if multipartObjInfo.Err != nil { + return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) } - if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { + if strings.Contains(multipartObjInfo.Name, "$multiparts") || strings.Contains(multipartObjInfo.Name, "$tmpobject") { continue } - if objInfo.IsDir && skipDir { + if multipartObjInfo.IsDir && skipDir { continue } - if objInfo.IsDir { - result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name) + if multipartObjInfo.IsDir { + result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) } else { - result.Uploads = append(result.Uploads, uploadMetadata{Object: objInfo.Name, UploadID: objInfo.UploadID, Initiated: objInfo.ModifiedTime}) + result.Uploads = append(result.Uploads, uploadMetadata{Object: multipartObjInfo.Name, UploadID: multipartObjInfo.UploadID, Initiated: multipartObjInfo.ModifiedTime}) } - nextKeyMarker = objInfo.Name - nextUploadIDMarker = objInfo.UploadID + nextKeyMarker = multipartObjInfo.Name + nextUploadIDMarker = multipartObjInfo.UploadID i++ } - if !objectInfoCh.IsClosed() { + if !multipartObjectInfoCh.IsClosed() { result.IsTruncated = true result.NextKeyMarker = nextKeyMarker result.NextUploadIDMarker = nextUploadIDMarker - fs.pushListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *objectInfoCh) + fs.saveListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *multipartObjectInfoCh) } return result, nil diff --git a/fs.go b/fs.go index 386725e4a..2f639eb94 100644 --- a/fs.go +++ b/fs.go @@ -50,43 +50,6 @@ type Filesystem struct { listMultipartObjectMapMutex *sync.Mutex } -func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { - fs.listMultipartObjectMapMutex.Lock() - defer fs.listMultipartObjectMapMutex.Unlock() - - channels := []multipartObjectInfoChannel{ch} - if _, ok := fs.listMultipartObjectMap[params]; ok { - channels = append(fs.listMultipartObjectMap[params], ch) - } - - fs.listMultipartObjectMap[params] = channels -} - -func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { - fs.listMultipartObjectMapMutex.Lock() - defer fs.listMultipartObjectMapMutex.Unlock() - - if channels, ok := fs.listMultipartObjectMap[params]; ok { - for i, channel := range channels { - if !channel.IsTimedOut() { - chs := channels[i+1:] - if len(chs) > 0 { - fs.listMultipartObjectMap[params] = chs - } else { - delete(fs.listMultipartObjectMap, params) - } - - return &channel - } - } - - // As all channels are timed out, delete the map entry - delete(fs.listMultipartObjectMap, params) - } - - return nil -} - // newFS instantiate a new filesystem. func newFS(rootPath string) (ObjectAPI, *probe.Error) { fs := &Filesystem{ From 8986a6802ae0b4d80a634668a3bc2e617803d542 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Apr 2016 15:08:59 -0700 Subject: [PATCH 3/3] Fix ListMultipartUploads 'mc ls -I' now works properly. --- api-response.go | 7 +++- fs-multipart.go | 86 ++++++++++++++++++++++++++++--------------------- 2 files changed, 55 insertions(+), 38 deletions(-) diff --git a/api-response.go b/api-response.go index e9c047673..11a3ba351 100644 --- a/api-response.go +++ b/api-response.go @@ -359,7 +359,12 @@ func generateListMultipartUploadsResponse(bucket string, multipartsInfo ListMult listMultipartUploadsResponse.MaxUploads = multipartsInfo.MaxUploads listMultipartUploadsResponse.NextUploadIDMarker = multipartsInfo.NextUploadIDMarker listMultipartUploadsResponse.UploadIDMarker = multipartsInfo.UploadIDMarker - + listMultipartUploadsResponse.CommonPrefixes = make([]CommonPrefix, len(multipartsInfo.CommonPrefixes)) + for index, commonPrefix := range multipartsInfo.CommonPrefixes { + listMultipartUploadsResponse.CommonPrefixes[index] = CommonPrefix{ + Prefix: commonPrefix, + } + } listMultipartUploadsResponse.Uploads = make([]Upload, len(multipartsInfo.Uploads)) for index, upload := range multipartsInfo.Uploads { newUpload := Upload{} diff --git a/fs-multipart.go b/fs-multipart.go index f70f5625a..43ba56151 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -57,7 +57,14 @@ func removeFileTree(fileName string, level string) error { return nil } -func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error { +func safeRemoveFile(file *os.File) error { + if e := file.Close(); e != nil { + return e + } + return os.Remove(file.Name()) +} + +func safeWriteFile(fileName string, data io.Reader, size int64, md5sum string) error { tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") if e != nil { return e @@ -66,8 +73,7 @@ func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error md5Hasher := md5.New() multiWriter := io.MultiWriter(md5Hasher, tempFile) if _, e := io.CopyN(multiWriter, data, size); e != nil { - tempFile.Close() - os.Remove(tempFile.Name()) + safeRemoveFile(tempFile) return e } tempFile.Close() @@ -284,8 +290,8 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i return "", probe.NewError(e) } - partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex) - if e := safeWrite(partFile, data, size, md5Hex); e != nil { + partFile := filepath.Join(fs.path, configDir, bucket, object, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex)) + if e := safeWriteFile(partFile, data, size, md5Hex); e != nil { return "", probe.NewError(e) } @@ -356,34 +362,30 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.") if e != nil { - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) return ObjectInfo{}, probe.NewError(e) } - for _, part := range parts { partNumber := part.PartNumber - md5sum := strings.Trim(part.ETag, "\"") - partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) - var f *os.File - f, e = os.Open(partFile) + // Trim off the odd double quotes from ETag in the beginning and end. + md5sum := strings.TrimPrefix(part.ETag, "\"") + md5sum = strings.TrimSuffix(md5sum, "\"") + partFileStr := filepath.Join(metaObjectDir, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5sum)) + var partFile *os.File + partFile, e = os.Open(partFileStr) if e != nil { - tempFile.Close() - os.Remove(tempFile.Name()) - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + safeRemoveFile(tempFile) return ObjectInfo{}, probe.NewError(e) - } else if _, e = io.Copy(tempFile, f); e != nil { - tempFile.Close() - os.Remove(tempFile.Name()) - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + } else if _, e = io.Copy(tempFile, partFile); e != nil { + safeRemoveFile(tempFile) return ObjectInfo{}, probe.NewError(e) } - f.Close() + partFile.Close() // Close part file after successful copy. } tempFile.Close() - // fi is used later - fi, e := os.Stat(tempFile.Name()) + + // Stat to gather fresh stat info. + objSt, e := os.Stat(tempFile.Name()) if e != nil { - os.Remove(tempFile.Name()) return ObjectInfo{}, probe.NewError(e) } @@ -391,7 +393,6 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa objectPath := filepath.Join(bucketPath, object) if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil { os.Remove(tempFile.Name()) - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) return ObjectInfo{}, probe.NewError(e) } if e = os.Rename(tempFile.Name(), objectPath); e != nil { @@ -411,8 +412,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa newObject := ObjectInfo{ Bucket: bucket, Name: object, - ModifiedTime: fi.ModTime(), - Size: fi.Size(), + ModifiedTime: objSt.ModTime(), + Size: objSt.Size(), ContentType: contentType, MD5Sum: s3MD5, } @@ -482,18 +483,14 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa } markerPath := filepath.FromSlash(keyMarker) - if uploadIDMarker != "" { if strings.HasSuffix(markerPath, string(os.PathSeparator)) { return result, probe.NewError(fmt.Errorf("Invalid combination of uploadID marker '%s' and marker '%s'", uploadIDMarker, keyMarker)) } - - id, err := uuid.Parse(uploadIDMarker) - - if err != nil { - return result, probe.NewError(err) + id, e := uuid.Parse(uploadIDMarker) + if e != nil { + return result, probe.NewError(e) } - if id.IsZero() { return result, probe.NewError(fmt.Errorf("Invalid upload ID marker %s", uploadIDMarker)) } @@ -516,9 +513,16 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa recursive = false } - bucketDir := filepath.Join(fs.path, bucket) - // If listMultipartObjectChannel is available for given parameters, then use it, else create new one - multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) + bucketDir := filepath.Join(fs.path, configDir, bucket) + // Lookup of if listMultipartObjectChannel is available for given + // parameters, else create a new one. + multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{ + bucket: bucket, + delimiter: delimiter, + keyMarker: markerPath, + prefix: prefixPath, + uploadIDMarker: uploadIDMarker, + }) if multipartObjectInfoCh == nil { ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) multipartObjectInfoCh = &ch @@ -534,10 +538,14 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa } if multipartObjInfo.Err != nil { + if os.IsNotExist(multipartObjInfo.Err) { + return ListMultipartsInfo{}, nil + } return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) } - if strings.Contains(multipartObjInfo.Name, "$multiparts") || strings.Contains(multipartObjInfo.Name, "$tmpobject") { + if strings.Contains(multipartObjInfo.Name, "$multiparts") || + strings.Contains(multipartObjInfo.Name, "$tmpobject") { continue } @@ -548,7 +556,11 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa if multipartObjInfo.IsDir { result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) } else { - result.Uploads = append(result.Uploads, uploadMetadata{Object: multipartObjInfo.Name, UploadID: multipartObjInfo.UploadID, Initiated: multipartObjInfo.ModifiedTime}) + result.Uploads = append(result.Uploads, uploadMetadata{ + Object: multipartObjInfo.Name, + UploadID: multipartObjInfo.UploadID, + Initiated: multipartObjInfo.ModifiedTime, + }) } nextKeyMarker = multipartObjInfo.Name nextUploadIDMarker = multipartObjInfo.UploadID