From 083e4e9479d76d8d167e1559e646eb52f724ba76 Mon Sep 17 00:00:00 2001 From: "Bala.FA" Date: Mon, 28 Mar 2016 22:22:09 +0530 Subject: [PATCH 1/4] backend/fs: Refactor multipart upload This patch modifies multipart upload related functions as below * New multipart upload call creates file EXPORT_DIR/.minio/BUCKET/PATH/TO/OBJECT/UPLOAD_ID.uploadid * Put object part call creates file EXPORT_DIR/.minio/BUCKET/PATH/TO/OBJECT/UPLOAD_ID.PART_NUMBER.MD5SUM_STRING * Abort multipart call removes all files matching EXPORT_DIR/.minio/BUCKET/PATH/TO/OBJECT/UPLOAD_ID.* * Complete multipart call does 1. creates a staging file EXPORT_DIR/.minio/BUCKET/PATH/TO/OBJECT/UPLOAD_ID.complete.TEMP_NAME then renames to EXPORT_DIR/.minio/BUCKET/PATH/TO/OBJECT/UPLOAD_ID.complete 2. rename staging file EXPORT_DIR/.minio/BUCKET/PATH/TO/OBJECT/UPLOAD_ID.complete to EXPORT_DIR/BUCKET/PATH/TO/OBJECT --- dir.go | 398 ++++++++ fs-bucket-listobjects.go | 13 +- fs-multipart.go | 876 +++++++++--------- fs.go | 63 +- .../skyrings/skyring-common/LICENSE | 202 ++++ .../skyring-common/tools/uuid/uuid.go | 133 +++ vendor/vendor.json | 9 +- 7 files changed, 1231 insertions(+), 463 deletions(-) create mode 100644 dir.go create mode 100644 vendor/github.com/skyrings/skyring-common/LICENSE create mode 100644 vendor/github.com/skyrings/skyring-common/tools/uuid/uuid.go diff --git a/dir.go b/dir.go new file mode 100644 index 000000000..3f8239193 --- /dev/null +++ b/dir.go @@ -0,0 +1,398 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "errors" + "io" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +// DirEntry - directory entry +type DirEntry struct { + Name string + Size int64 + Mode os.FileMode + ModTime time.Time +} + +// IsDir - returns true if DirEntry is a directory +func (entry DirEntry) IsDir() bool { + return entry.Mode.IsDir() +} + +// IsSymlink - returns true if DirEntry is a symbolic link +func (entry DirEntry) IsSymlink() bool { + return entry.Mode&os.ModeSymlink == os.ModeSymlink +} + +// IsRegular - returns true if DirEntry is a regular file +func (entry DirEntry) IsRegular() bool { + return entry.Mode.IsRegular() +} + +// sort interface for DirEntry slice +type byEntryName []DirEntry + +func (f byEntryName) Len() int { return len(f) } +func (f byEntryName) Swap(i, j int) { f[i], f[j] = f[j], f[i] } +func (f byEntryName) Less(i, j int) bool { return f[i].Name < f[j].Name } + +func filteredReaddir(dirname string, filter func(DirEntry) bool, appendPath bool) ([]DirEntry, error) { + result := []DirEntry{} + + d, err := os.Open(dirname) + if err != nil { + return result, err + } + + defer d.Close() + + for { + fis, err := d.Readdir(1000) + if err != nil { + if err == io.EOF { + break + } + + return result, err + } + + for _, fi := range fis { + name := fi.Name() + if appendPath { + name = filepath.Join(dirname, name) + } + + if fi.IsDir() { + name += string(os.PathSeparator) + } + + entry := DirEntry{Name: name, Size: fi.Size(), Mode: fi.Mode(), ModTime: fi.ModTime()} + + if filter == nil || filter(entry) { + result = append(result, entry) + } + } + } + + sort.Sort(byEntryName(result)) + + return result, nil +} + +func filteredReaddirnames(dirname string, filter func(string) bool) ([]string, error) { + result := []string{} + d, err := os.Open(dirname) + if err != nil { + return result, err + } + + defer d.Close() + + for { + names, err := d.Readdirnames(1000) + if err != nil { + if err == io.EOF { + break + } + + return result, err + } + + for _, name := range names { + if filter == nil || filter(name) { + result = append(result, name) + } + } + } + + sort.Strings(result) + + return result, nil +} + +func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) multipartObjectInfoChannel { + objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit) + timeoutCh := make(chan struct{}, 1) + + // TODO: check if bucketDir is absolute path + + scanDir := bucketDir + dirDepth := bucketDir + + if prefixPath != "" { + if !filepath.IsAbs(prefixPath) { + tmpPrefixPath := filepath.Join(bucketDir, prefixPath) + if strings.HasSuffix(prefixPath, string(os.PathSeparator)) { + tmpPrefixPath += string(os.PathSeparator) + } + + prefixPath = tmpPrefixPath + } + + // TODO: check if prefixPath starts with bucketDir + + // Case #1: if prefixPath is /mnt/mys3/mybucket/2012/photos/paris, then + // dirDepth is /mnt/mys3/mybucket/2012/photos + // Case #2: if prefixPath is /mnt/mys3/mybucket/2012/photos/, then + // dirDepth is /mnt/mys3/mybucket/2012/photos + dirDepth = filepath.Dir(prefixPath) + scanDir = dirDepth + } else { + prefixPath = bucketDir + } + + if markerPath != "" { + if !filepath.IsAbs(markerPath) { + tmpMarkerPath := filepath.Join(bucketDir, markerPath) + if strings.HasSuffix(markerPath, string(os.PathSeparator)) { + tmpMarkerPath += string(os.PathSeparator) + } + + markerPath = tmpMarkerPath + } + + // TODO: check markerPath must be a file + if uploadIDMarker != "" { + markerPath = filepath.Join(markerPath, uploadIDMarker+uploadIDSuffix) + } + + // TODO: check if markerPath starts with bucketDir + // TODO: check if markerPath starts with prefixPath + + // Case #1: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png, then + // scanDir is /mnt/mys3/mybucket/2012/photos + // Case #2: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png/1fbd117a-268a-4ed0-85c9-8cc3888cbf20.uploadid, then + // scanDir is /mnt/mys3/mybucket/2012/photos/gophercon.png + // Case #3: if markerPath is /mnt/mys3/mybucket/2012/photos/, then + // scanDir is /mnt/mys3/mybucket/2012/photos + + scanDir = filepath.Dir(markerPath) + } else { + markerPath = bucketDir + } + + // Have bucketDir ends with os.PathSeparator + if !strings.HasSuffix(bucketDir, string(os.PathSeparator)) { + bucketDir += string(os.PathSeparator) + } + + // Remove os.PathSeparator if scanDir ends with + if strings.HasSuffix(scanDir, string(os.PathSeparator)) { + scanDir = filepath.Dir(scanDir) + } + + // goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel. + go func() { + defer close(objectInfoCh) + defer close(timeoutCh) + + // send function - returns true if ObjectInfo is sent + // within (time.Second * 15) else false on timeout. + send := func(oi multipartObjectInfo) bool { + timer := time.After(time.Second * 15) + select { + case objectInfoCh <- oi: + return true + case <-timer: + timeoutCh <- struct{}{} + return false + } + } + + for { + entries, err := filteredReaddir(scanDir, + func(entry DirEntry) bool { + if entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix)) { + return strings.HasPrefix(entry.Name, prefixPath) && entry.Name > markerPath + } + + return false + }, + true) + if err != nil { + send(multipartObjectInfo{Err: err}) + return + } + + var entry DirEntry + for len(entries) > 0 { + entry, entries = entries[0], entries[1:] + + if entry.IsRegular() { + // Handle uploadid file + name := strings.Replace(filepath.Dir(entry.Name), bucketDir, "", 1) + if name == "" { + // This should not happen ie uploadid file should not be in bucket directory + send(multipartObjectInfo{Err: errors.New("corrupted meta data")}) + return + } + + uploadID := strings.Split(filepath.Base(entry.Name), uploadIDSuffix)[0] + + objInfo := multipartObjectInfo{ + Name: name, + UploadID: uploadID, + ModifiedTime: entry.ModTime, + } + + if !send(objInfo) { + return + } + + continue + } + + subentries, err := filteredReaddir(entry.Name, + func(entry DirEntry) bool { + return entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix)) + }, + true) + if err != nil { + send(multipartObjectInfo{Err: err}) + return + } + + subDirFound := false + uploadIDEntries := []DirEntry{} + // If subentries has a directory, then current entry needs to be sent + for _, subentry := range subentries { + if subentry.IsDir() { + subDirFound = true + + if recursive { + break + } + } + + if !recursive && subentry.IsRegular() { + uploadIDEntries = append(uploadIDEntries, subentry) + } + } + + if subDirFound || len(subentries) == 0 { + objInfo := multipartObjectInfo{ + Name: strings.Replace(entry.Name, bucketDir, "", 1), + ModifiedTime: entry.ModTime, + IsDir: true, + } + + if !send(objInfo) { + return + } + } + + if recursive { + entries = append(subentries, entries...) + } else { + entries = append(uploadIDEntries, entries...) + } + } + + if !recursive { + break + } + + markerPath = scanDir + string(os.PathSeparator) + + if scanDir = filepath.Dir(scanDir); scanDir < dirDepth { + break + } + } + }() + + return multipartObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh} +} + +// multipartObjectInfo - Multipart object info +type multipartObjectInfo struct { + Name string + UploadID string + ModifiedTime time.Time + IsDir bool + Err error +} + +// multipartObjectInfoChannel - multipart object info channel +type multipartObjectInfoChannel struct { + ch <-chan multipartObjectInfo + objInfo *multipartObjectInfo + closed bool + timeoutCh <-chan struct{} + timedOut bool +} + +func (oic *multipartObjectInfoChannel) Read() (multipartObjectInfo, bool) { + if oic.closed { + return multipartObjectInfo{}, false + } + + if oic.objInfo == nil { + // First read. + if oi, ok := <-oic.ch; ok { + oic.objInfo = &oi + } else { + oic.closed = true + return multipartObjectInfo{}, false + } + } + + retObjInfo := *oic.objInfo + status := true + oic.objInfo = nil + + // Read once more to know whether it was last read. + if oi, ok := <-oic.ch; ok { + oic.objInfo = &oi + } else { + oic.closed = true + } + + return retObjInfo, status +} + +// IsClosed - return whether channel is closed or not. +func (oic multipartObjectInfoChannel) IsClosed() bool { + if oic.objInfo != nil { + return false + } + + return oic.closed +} + +// IsTimedOut - return whether channel is closed due to timeout. +func (oic multipartObjectInfoChannel) IsTimedOut() bool { + if oic.timedOut { + return true + } + + select { + case _, ok := <-oic.timeoutCh: + if ok { + oic.timedOut = true + return true + } + return false + default: + return false + } +} diff --git a/fs-bucket-listobjects.go b/fs-bucket-listobjects.go index dfa6d40de..10375e57f 100644 --- a/fs-bucket-listobjects.go +++ b/fs-bucket-listobjects.go @@ -31,12 +31,17 @@ const ( ) // isDirExist - returns whether given directory is exist or not. -func isDirExist(dirname string) (status bool, err error) { +func isDirExist(dirname string) (bool, error) { fi, err := os.Lstat(dirname) - if err == nil { - status = fi.IsDir() + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + + return false, err } - return + + return fi.IsDir(), nil } func (fs *Filesystem) saveTreeWalk(params listObjectParams, walker *treeWalker) { diff --git a/fs-multipart.go b/fs-multipart.go index bdd6ad3dd..6a8396e56 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -18,125 +18,86 @@ package main import ( "crypto/md5" - "encoding/base64" "encoding/hex" "errors" "fmt" "io" - "math/rand" + "io/ioutil" + "net/url" "os" "path/filepath" - "sort" "strconv" "strings" - "time" - "github.com/minio/minio/pkg/atomic" - "github.com/minio/minio/pkg/crypto/sha512" "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/probe" + "github.com/skyrings/skyring-common/tools/uuid" ) -// isValidUploadID - is upload id. -func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) { - fs.rwLock.RLock() - defer fs.rwLock.RUnlock() - _, ok = fs.multiparts.ActiveSession[uploadID] - if !ok { - return - } - return -} - -// byObjectInfoKey is a sortable interface for UploadMetadata slice -type byUploadMetadataKey []uploadMetadata - -func (b byUploadMetadataKey) Len() int { return len(b) } -func (b byUploadMetadataKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] } -func (b byUploadMetadataKey) Less(i, j int) bool { return b[i].Object < b[j].Object } +const configDir = ".minio" +const uploadIDSuffix = ".uploadid" -// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata -func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) { - // Input validation. - if !IsValidBucketName(bucket) { - return ListMultipartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) +func removeFileTree(fileName string, level string) error { + if err := os.Remove(fileName); err != nil { + return err } - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e := os.Stat(bucketPath); e != nil { - // Check bucket exists. - if os.IsNotExist(e) { - return ListMultipartsInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) + + for fileDir := filepath.Dir(fileName); fileDir > level; fileDir = filepath.Dir(fileDir) { + if status, err := isDirEmpty(fileDir); err != nil { + return err + } else if !status { + break } - return ListMultipartsInfo{}, probe.NewError(e) - } - var uploads []uploadMetadata - multipartsInfo := ListMultipartsInfo{} - - fs.rwLock.RLock() - defer fs.rwLock.RUnlock() - for uploadID, session := range fs.multiparts.ActiveSession { - objectName := session.ObjectName - if strings.HasPrefix(objectName, objectPrefix) { - if len(uploads) > maxUploads { - sort.Sort(byUploadMetadataKey(uploads)) - multipartsInfo.Uploads = uploads - multipartsInfo.NextKeyMarker = session.ObjectName - multipartsInfo.NextUploadIDMarker = uploadID - multipartsInfo.IsTruncated = true - return multipartsInfo, nil - } - // uploadIDMarker is ignored if KeyMarker is empty. - switch { - case keyMarker != "" && uploadIDMarker == "": - if objectName > keyMarker { - upload := uploadMetadata{} - upload.Object = objectName - upload.UploadID = uploadID - upload.Initiated = session.Initiated - uploads = append(uploads, upload) - } - case keyMarker != "" && uploadIDMarker != "": - if session.UploadID > uploadIDMarker { - if objectName >= keyMarker { - upload := uploadMetadata{} - upload.Object = objectName - upload.UploadID = uploadID - upload.Initiated = session.Initiated - uploads = append(uploads, upload) - } - } - default: - upload := uploadMetadata{} - upload.Object = objectName - upload.UploadID = uploadID - upload.Initiated = session.Initiated - uploads = append(uploads, upload) - } + + if err := os.Remove(fileDir); err != nil { + return err } } - sort.Sort(byUploadMetadataKey(uploads)) - multipartsInfo.Uploads = uploads - return multipartsInfo, nil + + return nil } -// verify if parts sent over the network do really match with what we -// have for the session. -func doPartsMatch(parts []completePart, savedParts []partInfo) bool { - if parts == nil || savedParts == nil { - return false +func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error { + tempFile, err := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") + if err != nil { + return err } - if len(parts) != len(savedParts) { - return false + + md5Hasher := md5.New() + multiWriter := io.MultiWriter(md5Hasher, tempFile) + if _, err := io.CopyN(multiWriter, data, size); err != nil { + tempFile.Close() + os.Remove(tempFile.Name()) + return err } - // Range of incoming parts and compare them with saved parts. - for i, part := range parts { - if strings.Trim(part.ETag, "\"") != savedParts[i].ETag { - return false + tempFile.Close() + + dataMd5sum := hex.EncodeToString(md5Hasher.Sum(nil)) + if md5sum != "" && !isMD5SumEqual(md5sum, dataMd5sum) { + os.Remove(tempFile.Name()) + return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum} + } + + if err := os.Rename(tempFile.Name(), fileName); err != nil { + os.Remove(tempFile.Name()) + return err + } + + return nil +} + +func isFileExist(filename string) (bool, error) { + fi, err := os.Lstat(filename) + if err != nil { + if os.IsNotExist(err) { + return false, nil } + + return false, err } - return true + + return fi.Mode().IsRegular(), nil } // Create an s3 compatible MD5sum for complete multipart transaction. @@ -155,491 +116,502 @@ func makeS3MD5(md5Strs ...string) (string, *probe.Error) { return s3MD5, nil } -type multiCloser struct { - Closers []io.Closer -} +func (fs Filesystem) newUploadID(bucket, object string) (string, error) { + metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) -func (m multiCloser) Close() error { - for _, c := range m.Closers { - if e := c.Close(); e != nil { - return e + // create metaObjectDir if not exist + if status, err := isDirExist(metaObjectDir); err != nil { + return "", err + } else if !status { + if err := os.MkdirAll(metaObjectDir, 0755); err != nil { + return "", err } } - return nil -} -// MultiCloser - returns a Closer that's the logical -// concatenation of the provided input closers. They're closed -// sequentially. If any of the closers return a non-nil error, Close -// will return that error. -func MultiCloser(closers ...io.Closer) io.Closer { - return multiCloser{closers} -} - -// removeParts - remove all parts. -func removeParts(partPathPrefix string, parts []partInfo) *probe.Error { - for _, part := range parts { - // We are on purpose ignoring the return values here, since - // another thread would have purged these entries. - os.Remove(partPathPrefix + part.ETag + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) - } - return nil -} + for { + uuid, err := uuid.New() + if err != nil { + return "", err + } -// saveParts - concantenate and save all parts. -func saveParts(partPathPrefix string, mw io.Writer, parts []completePart) *probe.Error { - var partReaders []io.Reader - var partClosers []io.Closer - for _, part := range parts { - // Trim prefix - md5Sum := strings.TrimPrefix(part.ETag, "\"") - // Trim suffix - md5Sum = strings.TrimSuffix(md5Sum, "\"") - partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) - if e != nil { - if !os.IsNotExist(e) { - return probe.NewError(e) + uploadID := uuid.String() + uploadIDFile := filepath.Join(metaObjectDir, uploadID+uploadIDSuffix) + if _, err := os.Lstat(uploadIDFile); err != nil { + if !os.IsNotExist(err) { + return "", err } - // Some clients do not set Content-Md5, so we would have - // created part files without 'ETag' in them. - partFile, e = os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) - if e != nil { - return probe.NewError(e) + + // uploadIDFile doesn't exist, so create empty file to reserve the name + if err := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); err != nil { + return "", err } + + return uploadID, nil } - partReaders = append(partReaders, partFile) - partClosers = append(partClosers, partFile) + // uploadIDFile already exists. + // loop again to try with different uuid generated. } - // Concatenate a list of closers and close upon return. - closer := MultiCloser(partClosers...) - defer closer.Close() +} - reader := io.MultiReader(partReaders...) - readBufferSize := 8 * 1024 * 1024 // 8MiB - readBuffer := make([]byte, readBufferSize) // Allocate 8MiB buffer. - if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil { - return probe.NewError(e) - } - return nil +func (fs Filesystem) isUploadIDExist(bucket, object, uploadID string) (bool, error) { + return isFileExist(filepath.Join(fs.path, configDir, bucket, object, uploadID+uploadIDSuffix)) } -// NewMultipartUpload - initiate a new multipart session -func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) { - di, e := disk.GetInfo(fs.path) - if e != nil { - return "", probe.NewError(e) - } +func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error { + metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) + uploadIDPrefix := uploadID + "." - // Remove 5% from total space for cumulative disk space used for - // journalling, inodes etc. - availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 - if int64(availableDiskSpace) <= fs.minFreeDisk { - return "", probe.NewError(RootPathFull{Path: fs.path}) - } + names, err := filteredReaddirnames(metaObjectDir, + func(name string) bool { + return strings.HasPrefix(name, uploadIDPrefix) + }, + ) - // Input validation. - if !IsValidBucketName(bucket) { - return "", probe.NewError(BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return "", probe.NewError(ObjectNameInvalid{Object: object}) + if err != nil { + return err } - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e = os.Stat(bucketPath); e != nil { - // Check bucket exists. - if os.IsNotExist(e) { - return "", probe.NewError(BucketNotFound{Bucket: bucket}) + for _, name := range names { + if err := os.Remove(filepath.Join(metaObjectDir, name)); err != nil { + //return InternalError{Err: err} + return err } - return "", probe.NewError(e) } - objectPath := filepath.Join(bucketPath, object) - objectDir := filepath.Dir(objectPath) - if _, e = os.Stat(objectDir); e != nil { - if !os.IsNotExist(e) { - return "", probe.NewError(e) - } - e = os.MkdirAll(objectDir, 0700) - if e != nil { - return "", probe.NewError(e) + if status, err := isDirEmpty(metaObjectDir); err != nil { + // TODO: add log than returning error + //return InternalError{Err: err} + return err + } else if status { + if err := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); err != nil { + // TODO: add log than returning error + //return InternalError{Err: err} + return err } } - // Generate new upload id. - id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + object + time.Now().String()) - uploadIDSum := sha512.Sum512(id) - uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] - - // Critical region requiring write lock. - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - // Initialize multipart session. - mpartSession := &multipartSession{} - mpartSession.TotalParts = 0 - mpartSession.ObjectName = object - mpartSession.UploadID = uploadID - mpartSession.Initiated = time.Now().UTC() - // Multipart has maximum of 10000 parts. - var parts []partInfo - mpartSession.Parts = parts - - fs.multiparts.ActiveSession[uploadID] = mpartSession - if err := saveMultipartsSession(*fs.multiparts); err != nil { - return "", err.Trace(objectPath) - } - return uploadID, nil + return nil } -// Remove all duplicated parts based on the latest time of their upload. -func removeDuplicateParts(parts []partInfo) []partInfo { - length := len(parts) - 1 - for i := 0; i < length; i++ { - for j := i + 1; j <= length; j++ { - if parts[i].PartNumber == parts[j].PartNumber { - if parts[i].LastModified.Sub(parts[j].LastModified) > 0 { - parts[i] = parts[length] - } else { - parts[j] = parts[length] - } - parts = parts[0:length] - length-- - j-- - } - } +func (fs Filesystem) checkBucketArg(bucket string) (string, error) { + if !IsValidBucketName(bucket) { + return "", BucketNameInvalid{Bucket: bucket} } - return parts -} -// partNumber is a sortable interface for Part slice. -type partNumber []partInfo + bucket = getActualBucketname(fs.path, bucket) + if status, err := isDirExist(filepath.Join(fs.path, bucket)); err != nil { + //return "", InternalError{Err: err} + return "", err + } else if !status { + return "", BucketNotFound{Bucket: bucket} + } -func (a partNumber) Len() int { return len(a) } -func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } + return bucket, nil +} -// PutObjectPart - create a part in a multipart session -func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) { +func (fs Filesystem) checkDiskFree() error { di, err := disk.GetInfo(fs.path) if err != nil { - return "", probe.NewError(err) + return err } - // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. + // Remove 5% from total space for cumulative disk space used for + // journalling, inodes etc. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 if int64(availableDiskSpace) <= fs.minFreeDisk { - return "", probe.NewError(RootPathFull{Path: fs.path}) + return RootPathFull{Path: fs.path} } - // Check bucket name valid. - if !IsValidBucketName(bucket) { - return "", probe.NewError(BucketNameInvalid{Bucket: bucket}) + return nil +} + +func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) { + bucket, err := fs.checkBucketArg(bucket) + if err != nil { + return "", err } - // Verify object path legal. if !IsValidObjectName(object) { - return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) + return "", ObjectNameInvalid{Object: object} } - // Part id cannot be negative. - if partID <= 0 { - return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero")) - } + return bucket, nil +} - // Verify upload is valid for the incoming object. - if !fs.isValidUploadID(object, uploadID) { - return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) +// NewMultipartUpload - initiate a new multipart session +func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) { + if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + bucket = bucketDirName + } else { + return "", probe.NewError(err) } - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e := os.Stat(bucketPath); e != nil { - // Check bucket exists. - if os.IsNotExist(e) { - return "", probe.NewError(BucketNotFound{Bucket: bucket}) - } - return "", probe.NewError(e) + if err := fs.checkDiskFree(); err != nil { + return "", probe.NewError(err) } - objectPath := filepath.Join(bucketPath, object) - partPathPrefix := objectPath + uploadID - partPath := partPathPrefix + md5Hex + fmt.Sprintf("$%d-$multiparts", partID) - partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts") - if e != nil { - return "", probe.NewError(e) + uploadID, err := fs.newUploadID(bucket, object) + if err != nil { + return "", probe.NewError(err) } - defer partFile.Close() - - // Initialize md5 writer. - md5Writer := md5.New() - // Create a multiwriter. - multiWriter := io.MultiWriter(md5Writer, partFile) + return uploadID, nil +} - if _, e = io.CopyN(multiWriter, data, size); e != nil { - partFile.CloseAndPurge() - return "", probe.NewError(e) +// PutObjectPart - create a part in a multipart session +func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) { + if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + bucket = bucketDirName + } else { + return "", probe.NewError(err) } - // Finalize new md5. - newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) - if md5Hex != "" { - if newMD5Hex != md5Hex { - return "", probe.NewError(BadDigest{md5Hex, newMD5Hex}) - } + if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + //return "", probe.NewError(InternalError{Err: err}) + return "", probe.NewError(err) + } else if !status { + return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } - // Stat the file to get the latest information. - fi, e := os.Stat(partFile.Name()) - if e != nil { - return "", probe.NewError(e) + // Part id cannot be negative. + if partNumber <= 0 { + return "", probe.NewError(errors.New("invalid part id, cannot be zero or less than zero")) } - prtInfo := partInfo{} - prtInfo.PartNumber = partID - prtInfo.ETag = newMD5Hex - prtInfo.Size = fi.Size() - prtInfo.LastModified = fi.ModTime() - // Critical region requiring read lock. - fs.rwLock.RLock() - deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID] - fs.rwLock.RUnlock() - if !ok { - return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) + if partNumber > 10000 { + return "", probe.NewError(errors.New("invalid part id, should be not more than 10000")) } - // Add all incoming parts. - deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, prtInfo) + if err := fs.checkDiskFree(); err != nil { + return "", probe.NewError(err) + } - // Remove duplicate parts based on the most recent uploaded. - deserializedMultipartSession.Parts = removeDuplicateParts(deserializedMultipartSession.Parts) + partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex) + if err := safeWrite(partFile, data, size, md5Hex); err != nil { + return "", probe.NewError(err) + } - // Save total parts uploaded. - deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts) + return md5Hex, nil +} - // Sort by part number before saving. - sort.Sort(partNumber(deserializedMultipartSession.Parts)) +// AbortMultipartUpload - abort an incomplete multipart session +func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { + if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + bucket = bucketDirName + } else { + return probe.NewError(err) + } - // Critical region requiring write lock. - fs.rwLock.Lock() - defer fs.rwLock.Unlock() + if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + //return probe.NewError(InternalError{Err: err}) + return probe.NewError(err) + } else if !status { + return probe.NewError(InvalidUploadID{UploadID: uploadID}) + } - fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession - if err := saveMultipartsSession(*fs.multiparts); err != nil { - return "", err.Trace(partPathPrefix) + if err := fs.cleanupUploadID(bucket, object, uploadID); err != nil { + return probe.NewError(err) } - return newMD5Hex, nil + + return nil } // CompleteMultipartUpload - complete a multipart upload and persist the data -func (fs Filesystem) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) { - // Check bucket name is valid. - if !IsValidBucketName(bucket) { - return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) +func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) { + if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + bucket = bucketDirName + } else { + return ObjectInfo{}, probe.NewError(err) } - // Verify object path is legal. - if !IsValidObjectName(object) { - return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) + if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + //return probe.NewError(InternalError{Err: err}) + return ObjectInfo{}, probe.NewError(err) + } else if !status { + return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) } - // Verify if valid upload for incoming object. - if !fs.isValidUploadID(object, uploadID) { - return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) + if err := fs.checkDiskFree(); err != nil { + return ObjectInfo{}, probe.NewError(err) } - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e := os.Stat(bucketPath); e != nil { - // Check bucket exists. - if os.IsNotExist(e) { - return ObjectInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) + metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) + + var md5sums []string + for _, part := range parts { + partNumber := part.PartNumber + md5sum := strings.Trim(part.ETag, "\"") + partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) + if status, err := isFileExist(partFile); err != nil { + //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + return ObjectInfo{}, probe.NewError(err) + } else if !status { + return ObjectInfo{}, probe.NewError(InvalidPart{}) } - return ObjectInfo{}, probe.NewError(InternalError{}) + md5sums = append(md5sums, md5sum) } - objectPath := filepath.Join(bucketPath, object) - objectWriter, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject") - if e != nil { - return ObjectInfo{}, probe.NewError(e) + // Save the s3 md5. + s3MD5, perr := makeS3MD5(md5sums...) + if perr != nil { + return ObjectInfo{}, perr } - // Critical region requiring read lock. - fs.rwLock.RLock() - savedParts := fs.multiparts.ActiveSession[uploadID].Parts - fs.rwLock.RUnlock() - - if !doPartsMatch(parts, savedParts) { - objectWriter.CloseAndPurge() - return ObjectInfo{}, probe.NewError(InvalidPart{}) + tempFile, err := ioutil.TempFile(metaObjectDir, uploadID+".complete.") + if err != nil { + //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + return ObjectInfo{}, probe.NewError(err) } - // Parts successfully validated, save all the parts. - partPathPrefix := objectPath + uploadID - if err := saveParts(partPathPrefix, objectWriter, parts); err != nil { - objectWriter.CloseAndPurge() - return ObjectInfo{}, err.Trace(partPathPrefix) - } - var md5Strs []string - for _, part := range savedParts { - md5Strs = append(md5Strs, part.ETag) + for _, part := range parts { + partNumber := part.PartNumber + md5sum := strings.Trim(part.ETag, "\"") + partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) + var f *os.File + f, err = os.Open(partFile) + if err != nil { + tempFile.Close() + os.Remove(tempFile.Name()) + //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + return ObjectInfo{}, probe.NewError(err) + } else if _, err = io.Copy(tempFile, f); err != nil { + tempFile.Close() + os.Remove(tempFile.Name()) + //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + return ObjectInfo{}, probe.NewError(err) + } + f.Close() } - // Save the s3 md5. - s3MD5, err := makeS3MD5(md5Strs...) + tempFile.Close() + // fi is used later + fi, err := os.Stat(tempFile.Name()) if err != nil { - objectWriter.CloseAndPurge() - return ObjectInfo{}, err.Trace(md5Strs...) + os.Remove(tempFile.Name()) + return ObjectInfo{}, probe.NewError(err) } - // Successfully saved multipart, remove all parts in a routine. - go removeParts(partPathPrefix, savedParts) - - // Critical region requiring write lock. - fs.rwLock.Lock() - delete(fs.multiparts.ActiveSession, uploadID) - if err := saveMultipartsSession(*fs.multiparts); err != nil { - fs.rwLock.Unlock() - objectWriter.CloseAndPurge() - return ObjectInfo{}, err.Trace(partPathPrefix) + bucketPath := filepath.Join(fs.path, bucket) + objectPath := filepath.Join(bucketPath, object) + if err = os.MkdirAll(filepath.Dir(objectPath), 0755); err != nil { + os.Remove(tempFile.Name()) + //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + return ObjectInfo{}, probe.NewError(err) } - if e = objectWriter.Close(); e != nil { - fs.rwLock.Unlock() - return ObjectInfo{}, probe.NewError(e) + if err = os.Rename(tempFile.Name(), objectPath); err != nil { + os.Remove(tempFile.Name()) + return ObjectInfo{}, probe.NewError(err) } - fs.rwLock.Unlock() - // Send stat again to get object metadata. - st, e := os.Stat(objectPath) - if e != nil { - return ObjectInfo{}, probe.NewError(e) - } + fs.cleanupUploadID(bucket, object, uploadID) // TODO: handle and log the error contentType := "application/octet-stream" if objectExt := filepath.Ext(objectPath); objectExt != "" { - content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))] - if ok { + if content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]; ok { contentType = content.ContentType } } + newObject := ObjectInfo{ Bucket: bucket, Name: object, - ModifiedTime: st.ModTime(), - Size: st.Size(), + ModifiedTime: fi.ModTime(), + Size: fi.Size(), ContentType: contentType, MD5Sum: s3MD5, } + return newObject, nil } -// ListObjectParts - list parts from incomplete multipart session. -func (fs Filesystem) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) { - // Check bucket name is valid. - if !IsValidBucketName(bucket) { - return ListPartsInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) - } +// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata +func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) { + result := ListMultipartsInfo{} - // Verify object path legal. - if !IsValidObjectName(object) { - return ListPartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) + if bucketDirName, err := fs.checkBucketArg(bucket); err == nil { + bucket = bucketDirName + } else { + return result, probe.NewError(err) } - // Verify if upload id is valid for incoming object. - if !fs.isValidUploadID(object, uploadID) { - return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) + if !IsValidObjectPrefix(objectPrefix) { + return result, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: objectPrefix}) } - prtsInfo := ListPartsInfo{} - prtsInfo.Bucket = bucket - prtsInfo.Object = object - var startPartNumber int - switch { - case partNumberMarker == 0: - startPartNumber = 1 - default: - startPartNumber = partNumberMarker + prefixPath := filepath.FromSlash(objectPrefix) + + // Verify if delimiter is anything other than '/', which we do not support. + if delimiter != "" && delimiter != "/" { + return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter)) } - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e := os.Stat(bucketPath); e != nil { - // Check bucket exists. - if os.IsNotExist(e) { - return ListPartsInfo{}, probe.NewError(BucketNotFound{Bucket: bucket}) - } - return ListPartsInfo{}, probe.NewError(e) + // Unescape keyMarker string + if tmpKeyMarker, err := url.QueryUnescape(keyMarker); err == nil { + keyMarker = tmpKeyMarker + } else { + return result, probe.NewError(err) } - // Critical region requiring read lock. - fs.rwLock.RLock() - deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID] - fs.rwLock.RUnlock() - if !ok { - return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) + if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) { + return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix)) } - var parts []partInfo - for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ { - if len(parts) > maxParts { - sort.Sort(partNumber(parts)) - prtsInfo.IsTruncated = true - prtsInfo.Parts = parts - prtsInfo.NextPartNumberMarker = i - return prtsInfo, nil + + markerPath := filepath.FromSlash(keyMarker) + + if uploadIDMarker != "" { + if strings.HasSuffix(markerPath, string(os.PathSeparator)) { + return result, probe.NewError(fmt.Errorf("Invalid combination of uploadID marker '%s' and marker '%s'", uploadIDMarker, keyMarker)) + } + + id, err := uuid.Parse(uploadIDMarker) + + if err != nil { + return result, probe.NewError(err) + } + + if id.IsZero() { + return result, probe.NewError(fmt.Errorf("Invalid upload ID marker %s", uploadIDMarker)) } - parts = append(parts, deserializedMultipartSession.Parts[i-1]) } - sort.Sort(partNumber(parts)) - prtsInfo.Parts = parts - return prtsInfo, nil -} -// AbortMultipartUpload - abort an incomplete multipart session -func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { - // Check bucket name valid. - if !IsValidBucketName(bucket) { - return probe.NewError(BucketNameInvalid{Bucket: bucket}) + // Return empty response if maxUploads is zero + if maxUploads == 0 { + return result, nil } - // Verify object path legal. - if !IsValidObjectName(object) { - return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) + // set listObjectsLimit to maxUploads for out-of-range limit + if maxUploads < 0 || maxUploads > listObjectsLimit { + maxUploads = listObjectsLimit } - if !fs.isValidUploadID(object, uploadID) { - return probe.NewError(InvalidUploadID{UploadID: uploadID}) + recursive := true + skipDir := true + if delimiter == "/" { + skipDir = false + recursive = false } - bucket = getActualBucketname(fs.path, bucket) - bucketPath := filepath.Join(fs.path, bucket) - if _, e := os.Stat(bucketPath); e != nil { - // Check bucket exists. - if os.IsNotExist(e) { - return probe.NewError(BucketNotFound{Bucket: bucket}) + bucketDir := filepath.Join(fs.path, bucket) + // If listMultipartObjectChannel is available for given parameters, then use it, else create new one + objectInfoCh := fs.popListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) + if objectInfoCh == nil { + ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) + objectInfoCh = &ch + } + + nextKeyMarker := "" + nextUploadIDMarker := "" + for i := 0; i < maxUploads; { + objInfo, ok := objectInfoCh.Read() + if !ok { + // Closed channel. + return result, nil + } + + if objInfo.Err != nil { + return ListMultipartsInfo{}, probe.NewError(objInfo.Err) } - return probe.NewError(e) + + if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { + continue + } + + if objInfo.IsDir && skipDir { + continue + } + + if objInfo.IsDir { + result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name) + } else { + result.Uploads = append(result.Uploads, uploadMetadata{Object: objInfo.Name, UploadID: objInfo.UploadID, Initiated: objInfo.ModifiedTime}) + } + nextKeyMarker = objInfo.Name + nextUploadIDMarker = objInfo.UploadID + i++ } - objectPath := filepath.Join(bucketPath, object) - partPathPrefix := objectPath + uploadID + if !objectInfoCh.IsClosed() { + result.IsTruncated = true + result.NextKeyMarker = nextKeyMarker + result.NextUploadIDMarker = nextUploadIDMarker + fs.pushListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *objectInfoCh) + } - // Critical region requiring read lock. - fs.rwLock.RLock() - savedParts := fs.multiparts.ActiveSession[uploadID].Parts - fs.rwLock.RUnlock() + return result, nil +} - // Remove all parts. - if err := removeParts(partPathPrefix, savedParts); err != nil { - return err.Trace(partPathPrefix) +// ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata +func (fs Filesystem) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) { + if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + bucket = bucketDirName + } else { + return ListPartsInfo{}, probe.NewError(err) } - // Critical region requiring write lock. - fs.rwLock.Lock() - defer fs.rwLock.Unlock() + if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + //return probe.NewError(InternalError{Err: err}) + return ListPartsInfo{}, probe.NewError(err) + } else if !status { + return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) + } - delete(fs.multiparts.ActiveSession, uploadID) - if err := saveMultipartsSession(*fs.multiparts); err != nil { - return err.Trace(partPathPrefix) + metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) + entries, err := filteredReaddir(metaObjectDir, + func(entry DirEntry) bool { + if tokens := strings.Split(entry.Name, "."); len(tokens) == 3 { + if tokens[0] == uploadID { + if partNumber, err := strconv.Atoi(tokens[1]); err == nil { + if partNumber >= 1 && partNumber <= 10000 && partNumber > partNumberMarker { + return true + } + } + } + } + + return false + }, + false, + ) + + if err != nil { + return ListPartsInfo{}, probe.NewError(err) } - return nil + + isTruncated := false + if maxParts <= 0 || maxParts > 1000 { + maxParts = 1000 + } + nextPartNumberMarker := 0 + + parts := []partInfo{} + for i := range entries { + if i == maxParts { + isTruncated = true + break + } + + tokens := strings.Split(entries[i].Name, ".") + partNumber, _ := strconv.Atoi(tokens[1]) + md5sum := tokens[2] + parts = append(parts, partInfo{ + PartNumber: partNumber, + LastModified: entries[i].ModTime, + ETag: md5sum, + Size: entries[i].Size, + }) + } + + if isTruncated { + nextPartNumberMarker = 0 + } + + return ListPartsInfo{ + Bucket: bucket, + Object: object, + UploadID: uploadID, + PartNumberMarker: partNumberMarker, + NextPartNumberMarker: nextPartNumberMarker, + MaxParts: maxParts, + IsTruncated: isTruncated, + Parts: parts, + }, nil } diff --git a/fs.go b/fs.go index 93ffcc2f5..d2c9b519c 100644 --- a/fs.go +++ b/fs.go @@ -33,14 +33,25 @@ type listObjectParams struct { prefix string } +// listMultipartObjectParams - list multipart object params used for list multipart object map +type listMultipartObjectParams struct { + bucket string + delimiter string + keyMarker string + prefix string + uploadIDMarker string +} + // Filesystem - local variables type Filesystem struct { - path string - minFreeDisk int64 - rwLock *sync.RWMutex - multiparts *multiparts - listObjectMap map[listObjectParams][]*treeWalker - listObjectMapMutex *sync.Mutex + path string + minFreeDisk int64 + rwLock *sync.RWMutex + multiparts *multiparts + listObjectMap map[listObjectParams][]*treeWalker + listObjectMapMutex *sync.Mutex + listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel + listMultipartObjectMapMutex *sync.Mutex } // MultipartSession holds active session information @@ -58,6 +69,43 @@ type multiparts struct { ActiveSession map[string]*multipartSession `json:"activeSessions"` } +func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + channels := []multipartObjectInfoChannel{ch} + if _, ok := fs.listMultipartObjectMap[params]; ok { + channels = append(fs.listMultipartObjectMap[params], ch) + } + + fs.listMultipartObjectMap[params] = channels +} + +func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + if channels, ok := fs.listMultipartObjectMap[params]; ok { + for i, channel := range channels { + if !channel.IsTimedOut() { + chs := channels[i+1:] + if len(chs) > 0 { + fs.listMultipartObjectMap[params] = chs + } else { + delete(fs.listMultipartObjectMap, params) + } + + return &channel + } + } + + // As all channels are timed out, delete the map entry + delete(fs.listMultipartObjectMap, params) + } + + return nil +} + // newFS instantiate a new filesystem. func newFS(rootPath string) (ObjectAPI, *probe.Error) { setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json")) @@ -94,6 +142,9 @@ func newFS(rootPath string) (ObjectAPI, *probe.Error) { fs.listObjectMap = make(map[listObjectParams][]*treeWalker) fs.listObjectMapMutex = &sync.Mutex{} + fs.listMultipartObjectMap = make(map[listMultipartObjectParams][]multipartObjectInfoChannel) + fs.listMultipartObjectMapMutex = &sync.Mutex{} + // Return here. return fs, nil } diff --git a/vendor/github.com/skyrings/skyring-common/LICENSE b/vendor/github.com/skyrings/skyring-common/LICENSE new file mode 100644 index 000000000..8f71f43fe --- /dev/null +++ b/vendor/github.com/skyrings/skyring-common/LICENSE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/vendor/github.com/skyrings/skyring-common/tools/uuid/uuid.go b/vendor/github.com/skyrings/skyring-common/tools/uuid/uuid.go new file mode 100644 index 000000000..978b3f976 --- /dev/null +++ b/vendor/github.com/skyrings/skyring-common/tools/uuid/uuid.go @@ -0,0 +1,133 @@ +// Copyright 2015 Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uuid + +import ( + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "io" + "strings" +) + +// UUID is 128bits = 16bytes +type UUID [16]byte + +func (uuid UUID) String() string { + return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]) +} + +func (uuid UUID) IsZero() bool { + var zeroUuid UUID + return Equal(zeroUuid, uuid) +} + +func (uuid UUID) MarshalJSON() ([]byte, error) { + return []byte(`"` + uuid.String() + `"`), nil +} + +func (uuid *UUID) UnmarshalJSON(b []byte) error { + if u, err := Parse(string(b)); err != nil { + return err + } else { + copy(uuid[:], u[:]) + return nil + } +} + +func New() (*UUID, error) { + uuid := new(UUID) + + n, err := io.ReadFull(rand.Reader, uuid[:]) + if err != nil { + return nil, err + } else if n != len(uuid) { + return nil, errors.New(fmt.Sprintf("insufficient random data (expected: %d, read: %d)", len(uuid), n)) + } else { + // variant bits; for more info + // see https://www.ietf.org/rfc/rfc4122.txt section 4.1.1 + uuid[8] = uuid[8]&0x3f | 0x80 + // version 4 (pseudo-random); for more info + // see https://www.ietf.org/rfc/rfc4122.txt section 4.1.3 + uuid[6] = uuid[6]&0x0f | 0x40 + } + + return uuid, nil +} + +func Equal(uuid1 UUID, uuid2 UUID) bool { + for i, v := range uuid1 { + if v != uuid2[i] { + return false + } + } + + return true +} + +func Parse(s string) (*UUID, error) { + // the string format should be either in + // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (or) + // xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + + // If the uuid is marshaled by us we add " " around the uuid. + // while parsing this, we have to remove the " " around the + // uuid. So we check if uuid has " " around it, if yes we remove + // it. + + if strings.HasPrefix(s, "\"") && strings.HasSuffix(s, "\"") { + s = s[1 : len(s)-1] + } + + uuid := new(UUID) + if len(s) == 36 { + if ba, err := hex.DecodeString(s[0:8]); err == nil { + copy(uuid[:4], ba) + } else { + return nil, err + } + if ba, err := hex.DecodeString(s[9:13]); err == nil { + copy(uuid[4:], ba) + } else { + return nil, err + } + if ba, err := hex.DecodeString(s[14:18]); err == nil { + copy(uuid[6:], ba) + } else { + return nil, err + } + if ba, err := hex.DecodeString(s[19:23]); err == nil { + copy(uuid[8:], ba) + } else { + return nil, err + } + if ba, err := hex.DecodeString(s[24:]); err == nil { + copy(uuid[10:], ba) + } else { + return nil, err + } + } else if len(s) == 32 { + if ba, err := hex.DecodeString(s); err == nil { + copy(uuid[:], ba) + } else { + return nil, err + } + } else { + return nil, errors.New("unknown UUID string " + s) + } + + return uuid, nil +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 97b24e80d..39a70c2da 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -107,6 +107,12 @@ "revision": "eb527c8097e0f19a3ff7b253a3fe70545070f420", "revisionTime": "2015-08-29T22:34:20-07:00" }, + { + "checksumSHA1": "u0hXGADM3JDza8YjgiyNJpAJk8g=", + "path": "github.com/skyrings/skyring-common/tools/uuid", + "revision": "762fd2bfc12e766d90478d638255981ab1966a3d", + "revisionTime": "2016-03-24T19:44:43+05:30" + }, { "path": "golang.org/x/crypto/bcrypt", "revision": "7b85b097bf7527677d54d3220065e966a0e3b613", @@ -122,5 +128,6 @@ "revision": "11d3bc7aa68e238947792f30573146a3231fc0f1", "revisionTime": "2015-07-29T10:04:31+02:00" } - ] + ], + "rootPath": "github.com/minio/minio" } From 9632c94e7aa8d1e9514003b83c17ca83ac3d85f2 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Apr 2016 11:24:23 -0700 Subject: [PATCH 2/4] Fix list objects test and remove all the old unnecessary files. - Fix tests for new changes. - Change Golang err as 'e' for the time being, before we bring in probe removal change. - Remove old structs and temporary files. --- fs-backend-metadata.go | 56 ------------ fs-bucket-listobjects.go | 15 ++-- fs-multipart.go | 190 +++++++++++++++++++-------------------- fs.go | 40 --------- fs_api_suite_test.go | 2 +- 5 files changed, 101 insertions(+), 202 deletions(-) delete mode 100644 fs-backend-metadata.go diff --git a/fs-backend-metadata.go b/fs-backend-metadata.go deleted file mode 100644 index 100ef3df9..000000000 --- a/fs-backend-metadata.go +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2015, 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "github.com/minio/minio/pkg/probe" - "github.com/minio/minio/pkg/quick" -) - -var multipartsMetadataPath string - -// SetFSMultipartsMetadataPath - set custom multiparts session metadata path. -func setFSMultipartsMetadataPath(metadataPath string) { - multipartsMetadataPath = metadataPath -} - -// saveMultipartsSession - save multiparts. -func saveMultipartsSession(mparts multiparts) *probe.Error { - qc, err := quick.New(mparts) - if err != nil { - return err.Trace() - } - if err := qc.Save(multipartsMetadataPath); err != nil { - return err.Trace() - } - return nil -} - -// loadMultipartsSession load multipart session file. -func loadMultipartsSession() (*multiparts, *probe.Error) { - mparts := &multiparts{} - mparts.Version = "1" - mparts.ActiveSession = make(map[string]*multipartSession) - qc, err := quick.New(mparts) - if err != nil { - return nil, err.Trace() - } - if err := qc.Load(multipartsMetadataPath); err != nil { - return nil, err.Trace() - } - return qc.Data().(*multiparts), nil -} diff --git a/fs-bucket-listobjects.go b/fs-bucket-listobjects.go index 10375e57f..6e55f3b8b 100644 --- a/fs-bucket-listobjects.go +++ b/fs-bucket-listobjects.go @@ -32,15 +32,13 @@ const ( // isDirExist - returns whether given directory is exist or not. func isDirExist(dirname string) (bool, error) { - fi, err := os.Lstat(dirname) - if err != nil { - if os.IsNotExist(err) { + fi, e := os.Lstat(dirname) + if e != nil { + if os.IsNotExist(e) { return false, nil } - - return false, err + return false, e } - return fi.IsDir(), nil } @@ -129,9 +127,8 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe // Verify if prefix exists. prefixDir := filepath.Dir(filepath.FromSlash(prefix)) rootDir := filepath.Join(bucketDir, prefixDir) - _, e := isDirExist(rootDir) - if e != nil { - if os.IsNotExist(e) { + if status, e := isDirExist(rootDir); !status { + if e == nil { // Prefix does not exist, not an error just respond empty // list response. return result, nil diff --git a/fs-multipart.go b/fs-multipart.go index 6a8396e56..071cc270e 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -39,19 +39,19 @@ const configDir = ".minio" const uploadIDSuffix = ".uploadid" func removeFileTree(fileName string, level string) error { - if err := os.Remove(fileName); err != nil { - return err + if e := os.Remove(fileName); e != nil { + return e } for fileDir := filepath.Dir(fileName); fileDir > level; fileDir = filepath.Dir(fileDir) { - if status, err := isDirEmpty(fileDir); err != nil { - return err + if status, e := isDirEmpty(fileDir); e != nil { + return e } else if !status { break } - if err := os.Remove(fileDir); err != nil { - return err + if e := os.Remove(fileDir); e != nil { + return e } } @@ -59,17 +59,17 @@ func removeFileTree(fileName string, level string) error { } func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error { - tempFile, err := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") - if err != nil { - return err + tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") + if e != nil { + return e } md5Hasher := md5.New() multiWriter := io.MultiWriter(md5Hasher, tempFile) - if _, err := io.CopyN(multiWriter, data, size); err != nil { + if _, e := io.CopyN(multiWriter, data, size); e != nil { tempFile.Close() os.Remove(tempFile.Name()) - return err + return e } tempFile.Close() @@ -79,22 +79,22 @@ func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum} } - if err := os.Rename(tempFile.Name(), fileName); err != nil { + if e := os.Rename(tempFile.Name(), fileName); e != nil { os.Remove(tempFile.Name()) - return err + return e } return nil } func isFileExist(filename string) (bool, error) { - fi, err := os.Lstat(filename) - if err != nil { - if os.IsNotExist(err) { + fi, e := os.Lstat(filename) + if e != nil { + if os.IsNotExist(e) { return false, nil } - return false, err + return false, e } return fi.Mode().IsRegular(), nil @@ -120,30 +120,30 @@ func (fs Filesystem) newUploadID(bucket, object string) (string, error) { metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) // create metaObjectDir if not exist - if status, err := isDirExist(metaObjectDir); err != nil { - return "", err + if status, e := isDirExist(metaObjectDir); e != nil { + return "", e } else if !status { - if err := os.MkdirAll(metaObjectDir, 0755); err != nil { - return "", err + if e := os.MkdirAll(metaObjectDir, 0755); e != nil { + return "", e } } for { - uuid, err := uuid.New() - if err != nil { - return "", err + uuid, e := uuid.New() + if e != nil { + return "", e } uploadID := uuid.String() uploadIDFile := filepath.Join(metaObjectDir, uploadID+uploadIDSuffix) - if _, err := os.Lstat(uploadIDFile); err != nil { - if !os.IsNotExist(err) { - return "", err + if _, e := os.Lstat(uploadIDFile); e != nil { + if !os.IsNotExist(e) { + return "", e } // uploadIDFile doesn't exist, so create empty file to reserve the name - if err := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); err != nil { - return "", err + if e := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); e != nil { + return "", e } return uploadID, nil @@ -161,32 +161,32 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error { metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) uploadIDPrefix := uploadID + "." - names, err := filteredReaddirnames(metaObjectDir, + names, e := filteredReaddirnames(metaObjectDir, func(name string) bool { return strings.HasPrefix(name, uploadIDPrefix) }, ) - if err != nil { - return err + if e != nil { + return e } for _, name := range names { - if err := os.Remove(filepath.Join(metaObjectDir, name)); err != nil { + if e := os.Remove(filepath.Join(metaObjectDir, name)); e != nil { //return InternalError{Err: err} - return err + return e } } - if status, err := isDirEmpty(metaObjectDir); err != nil { + if status, e := isDirEmpty(metaObjectDir); e != nil { // TODO: add log than returning error //return InternalError{Err: err} - return err + return e } else if status { - if err := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); err != nil { + if e := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); e != nil { // TODO: add log than returning error //return InternalError{Err: err} - return err + return e } } @@ -199,9 +199,9 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) { } bucket = getActualBucketname(fs.path, bucket) - if status, err := isDirExist(filepath.Join(fs.path, bucket)); err != nil { + if status, e := isDirExist(filepath.Join(fs.path, bucket)); e != nil { //return "", InternalError{Err: err} - return "", err + return "", e } else if !status { return "", BucketNotFound{Bucket: bucket} } @@ -210,13 +210,12 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) { } func (fs Filesystem) checkDiskFree() error { - di, err := disk.GetInfo(fs.path) - if err != nil { - return err + di, e := disk.GetInfo(fs.path) + if e != nil { + return e } - // Remove 5% from total space for cumulative disk space used for - // journalling, inodes etc. + // Remove 5% from total space for cumulative disk space used for journalling, inodes etc. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 if int64(availableDiskSpace) <= fs.minFreeDisk { return RootPathFull{Path: fs.path} @@ -226,9 +225,9 @@ func (fs Filesystem) checkDiskFree() error { } func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) { - bucket, err := fs.checkBucketArg(bucket) - if err != nil { - return "", err + bucket, e := fs.checkBucketArg(bucket) + if e != nil { + return "", e } if !IsValidObjectName(object) { @@ -240,19 +239,19 @@ func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) { // NewMultipartUpload - initiate a new multipart session func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return "", probe.NewError(err) + return "", probe.NewError(e) } - if err := fs.checkDiskFree(); err != nil { - return "", probe.NewError(err) + if e := fs.checkDiskFree(); e != nil { + return "", probe.NewError(e) } - uploadID, err := fs.newUploadID(bucket, object) - if err != nil { - return "", probe.NewError(err) + uploadID, e := fs.newUploadID(bucket, object) + if e != nil { + return "", probe.NewError(e) } return uploadID, nil @@ -260,15 +259,15 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E // PutObjectPart - create a part in a multipart session func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return "", probe.NewError(err) + return "", probe.NewError(e) } - if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil { //return "", probe.NewError(InternalError{Err: err}) - return "", probe.NewError(err) + return "", probe.NewError(e) } else if !status { return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } @@ -282,13 +281,13 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i return "", probe.NewError(errors.New("invalid part id, should be not more than 10000")) } - if err := fs.checkDiskFree(); err != nil { - return "", probe.NewError(err) + if e := fs.checkDiskFree(); e != nil { + return "", probe.NewError(e) } partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex) - if err := safeWrite(partFile, data, size, md5Hex); err != nil { - return "", probe.NewError(err) + if e := safeWrite(partFile, data, size, md5Hex); e != nil { + return "", probe.NewError(e) } return md5Hex, nil @@ -296,21 +295,21 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i // AbortMultipartUpload - abort an incomplete multipart session func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return probe.NewError(err) + return probe.NewError(e) } - if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil { //return probe.NewError(InternalError{Err: err}) - return probe.NewError(err) + return probe.NewError(e) } else if !status { return probe.NewError(InvalidUploadID{UploadID: uploadID}) } - if err := fs.cleanupUploadID(bucket, object, uploadID); err != nil { - return probe.NewError(err) + if e := fs.cleanupUploadID(bucket, object, uploadID); e != nil { + return probe.NewError(e) } return nil @@ -318,49 +317,48 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob // CompleteMultipartUpload - complete a multipart upload and persist the data func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) { - if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil { + if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil { bucket = bucketDirName } else { - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } - if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil { + if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil { //return probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } else if !status { return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) } - if err := fs.checkDiskFree(); err != nil { - return ObjectInfo{}, probe.NewError(err) + if e := fs.checkDiskFree(); e != nil { + return ObjectInfo{}, probe.NewError(e) } metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) - var md5sums []string + var md5Sums []string for _, part := range parts { partNumber := part.PartNumber md5sum := strings.Trim(part.ETag, "\"") partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) if status, err := isFileExist(partFile); err != nil { - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) return ObjectInfo{}, probe.NewError(err) } else if !status { return ObjectInfo{}, probe.NewError(InvalidPart{}) } - md5sums = append(md5sums, md5sum) + md5Sums = append(md5Sums, md5sum) } // Save the s3 md5. - s3MD5, perr := makeS3MD5(md5sums...) - if perr != nil { - return ObjectInfo{}, perr + s3MD5, err := makeS3MD5(md5Sums...) + if err != nil { + return ObjectInfo{}, err.Trace(md5Sums...) } - tempFile, err := ioutil.TempFile(metaObjectDir, uploadID+".complete.") - if err != nil { + tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.") + if e != nil { //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } for _, part := range parts { @@ -368,38 +366,38 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa md5sum := strings.Trim(part.ETag, "\"") partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) var f *os.File - f, err = os.Open(partFile) - if err != nil { + f, e = os.Open(partFile) + if e != nil { tempFile.Close() os.Remove(tempFile.Name()) //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) - } else if _, err = io.Copy(tempFile, f); err != nil { + return ObjectInfo{}, probe.NewError(e) + } else if _, e = io.Copy(tempFile, f); e != nil { tempFile.Close() os.Remove(tempFile.Name()) //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } f.Close() } tempFile.Close() // fi is used later - fi, err := os.Stat(tempFile.Name()) - if err != nil { + fi, e := os.Stat(tempFile.Name()) + if e != nil { os.Remove(tempFile.Name()) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } bucketPath := filepath.Join(fs.path, bucket) objectPath := filepath.Join(bucketPath, object) - if err = os.MkdirAll(filepath.Dir(objectPath), 0755); err != nil { + if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil { os.Remove(tempFile.Name()) //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } - if err = os.Rename(tempFile.Name(), objectPath); err != nil { + if e = os.Rename(tempFile.Name(), objectPath); e != nil { os.Remove(tempFile.Name()) - return ObjectInfo{}, probe.NewError(err) + return ObjectInfo{}, probe.NewError(e) } fs.cleanupUploadID(bucket, object, uploadID) // TODO: handle and log the error diff --git a/fs.go b/fs.go index d2c9b519c..386725e4a 100644 --- a/fs.go +++ b/fs.go @@ -17,10 +17,7 @@ package main import ( - "os" - "path/filepath" "sync" - "time" "github.com/minio/minio/pkg/probe" ) @@ -47,28 +44,12 @@ type Filesystem struct { path string minFreeDisk int64 rwLock *sync.RWMutex - multiparts *multiparts listObjectMap map[listObjectParams][]*treeWalker listObjectMapMutex *sync.Mutex listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel listMultipartObjectMapMutex *sync.Mutex } -// MultipartSession holds active session information -type multipartSession struct { - TotalParts int - ObjectName string - UploadID string - Initiated time.Time - Parts []partInfo -} - -// multiparts collection of many parts -type multiparts struct { - Version string `json:"version"` - ActiveSession map[string]*multipartSession `json:"activeSessions"` -} - func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { fs.listMultipartObjectMapMutex.Lock() defer fs.listMultipartObjectMapMutex.Unlock() @@ -108,31 +89,10 @@ func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) // newFS instantiate a new filesystem. func newFS(rootPath string) (ObjectAPI, *probe.Error) { - setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json")) - - var err *probe.Error - // load multiparts session from disk - var mparts *multiparts - mparts, err = loadMultipartsSession() - if err != nil { - if os.IsNotExist(err.ToGoError()) { - mparts = &multiparts{ - Version: "1", - ActiveSession: make(map[string]*multipartSession), - } - if err = saveMultipartsSession(*mparts); err != nil { - return nil, err.Trace() - } - } else { - return nil, err.Trace() - } - } - fs := &Filesystem{ rwLock: &sync.RWMutex{}, } fs.path = rootPath - fs.multiparts = mparts /// Defaults diff --git a/fs_api_suite_test.go b/fs_api_suite_test.go index 87b53426c..804a64db7 100644 --- a/fs_api_suite_test.go +++ b/fs_api_suite_test.go @@ -79,7 +79,7 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) { } objInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts) c.Assert(err, check.IsNil) - c.Assert(objInfo.MD5Sum, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10") + c.Assert(objInfo.MD5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10") } func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) { From 3fcc60de91918b5942586cf53b95b73662324fac Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Apr 2016 12:26:17 -0700 Subject: [PATCH 3/4] Move the files and rename some functions. - Rename dir.go as 'fs-multipart-dir.go' - Move the push/pop to fs-multipart.go and rename them as save/lookup. - Rename objectInfo instances in fs-multipart as multipartObjInfo. --- bucket-handlers.go | 18 ++++++++- dir.go => fs-multipart-dir.go | 3 -- fs-multipart.go | 74 ++++++++++++++++++++++++----------- fs.go | 37 ------------------ 4 files changed, 67 insertions(+), 65 deletions(-) rename dir.go => fs-multipart-dir.go (99%) diff --git a/bucket-handlers.go b/bucket-handlers.go index 644be8c90..66967ddae 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -179,8 +179,22 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path) return } - if maxUploads == 0 { - maxUploads = maxObjectList + if keyMarker != "" { + // Unescape keyMarker string + keyMarkerUnescaped, e := url.QueryUnescape(keyMarker) + if e != nil { + if e != nil { + // Return 'NoSuchKey' to indicate invalid marker key. + writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path) + return + } + keyMarker = keyMarkerUnescaped + // Marker not common with prefix is not implemented. + if !strings.HasPrefix(keyMarker, prefix) { + writeErrorResponse(w, r, ErrNotImplemented, r.URL.Path) + return + } + } } listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) diff --git a/dir.go b/fs-multipart-dir.go similarity index 99% rename from dir.go rename to fs-multipart-dir.go index 3f8239193..36a953fe2 100644 --- a/dir.go +++ b/fs-multipart-dir.go @@ -135,7 +135,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, timeoutCh := make(chan struct{}, 1) // TODO: check if bucketDir is absolute path - scanDir := bucketDir dirDepth := bucketDir @@ -145,7 +144,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, if strings.HasSuffix(prefixPath, string(os.PathSeparator)) { tmpPrefixPath += string(os.PathSeparator) } - prefixPath = tmpPrefixPath } @@ -375,7 +373,6 @@ func (oic multipartObjectInfoChannel) IsClosed() bool { if oic.objInfo != nil { return false } - return oic.closed } diff --git a/fs-multipart.go b/fs-multipart.go index 071cc270e..f70f5625a 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "io/ioutil" - "net/url" "os" "path/filepath" "strconv" @@ -421,6 +420,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa return newObject, nil } +func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + channels := []multipartObjectInfoChannel{ch} + if _, ok := fs.listMultipartObjectMap[params]; ok { + channels = append(fs.listMultipartObjectMap[params], ch) + } + + fs.listMultipartObjectMap[params] = channels +} + +func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { + fs.listMultipartObjectMapMutex.Lock() + defer fs.listMultipartObjectMapMutex.Unlock() + + if channels, ok := fs.listMultipartObjectMap[params]; ok { + for i, channel := range channels { + if !channel.IsTimedOut() { + chs := channels[i+1:] + if len(chs) > 0 { + fs.listMultipartObjectMap[params] = chs + } else { + delete(fs.listMultipartObjectMap, params) + } + + return &channel + } + } + + // As all channels are timed out, delete the map entry + delete(fs.listMultipartObjectMap, params) + } + return nil +} + // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) { result := ListMultipartsInfo{} @@ -442,13 +477,6 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter)) } - // Unescape keyMarker string - if tmpKeyMarker, err := url.QueryUnescape(keyMarker); err == nil { - keyMarker = tmpKeyMarker - } else { - return result, probe.NewError(err) - } - if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) { return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix)) } @@ -490,48 +518,48 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa bucketDir := filepath.Join(fs.path, bucket) // If listMultipartObjectChannel is available for given parameters, then use it, else create new one - objectInfoCh := fs.popListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) - if objectInfoCh == nil { + multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) + if multipartObjectInfoCh == nil { ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) - objectInfoCh = &ch + multipartObjectInfoCh = &ch } nextKeyMarker := "" nextUploadIDMarker := "" for i := 0; i < maxUploads; { - objInfo, ok := objectInfoCh.Read() + multipartObjInfo, ok := multipartObjectInfoCh.Read() if !ok { // Closed channel. return result, nil } - if objInfo.Err != nil { - return ListMultipartsInfo{}, probe.NewError(objInfo.Err) + if multipartObjInfo.Err != nil { + return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) } - if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { + if strings.Contains(multipartObjInfo.Name, "$multiparts") || strings.Contains(multipartObjInfo.Name, "$tmpobject") { continue } - if objInfo.IsDir && skipDir { + if multipartObjInfo.IsDir && skipDir { continue } - if objInfo.IsDir { - result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name) + if multipartObjInfo.IsDir { + result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) } else { - result.Uploads = append(result.Uploads, uploadMetadata{Object: objInfo.Name, UploadID: objInfo.UploadID, Initiated: objInfo.ModifiedTime}) + result.Uploads = append(result.Uploads, uploadMetadata{Object: multipartObjInfo.Name, UploadID: multipartObjInfo.UploadID, Initiated: multipartObjInfo.ModifiedTime}) } - nextKeyMarker = objInfo.Name - nextUploadIDMarker = objInfo.UploadID + nextKeyMarker = multipartObjInfo.Name + nextUploadIDMarker = multipartObjInfo.UploadID i++ } - if !objectInfoCh.IsClosed() { + if !multipartObjectInfoCh.IsClosed() { result.IsTruncated = true result.NextKeyMarker = nextKeyMarker result.NextUploadIDMarker = nextUploadIDMarker - fs.pushListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *objectInfoCh) + fs.saveListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *multipartObjectInfoCh) } return result, nil diff --git a/fs.go b/fs.go index 386725e4a..2f639eb94 100644 --- a/fs.go +++ b/fs.go @@ -50,43 +50,6 @@ type Filesystem struct { listMultipartObjectMapMutex *sync.Mutex } -func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) { - fs.listMultipartObjectMapMutex.Lock() - defer fs.listMultipartObjectMapMutex.Unlock() - - channels := []multipartObjectInfoChannel{ch} - if _, ok := fs.listMultipartObjectMap[params]; ok { - channels = append(fs.listMultipartObjectMap[params], ch) - } - - fs.listMultipartObjectMap[params] = channels -} - -func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel { - fs.listMultipartObjectMapMutex.Lock() - defer fs.listMultipartObjectMapMutex.Unlock() - - if channels, ok := fs.listMultipartObjectMap[params]; ok { - for i, channel := range channels { - if !channel.IsTimedOut() { - chs := channels[i+1:] - if len(chs) > 0 { - fs.listMultipartObjectMap[params] = chs - } else { - delete(fs.listMultipartObjectMap, params) - } - - return &channel - } - } - - // As all channels are timed out, delete the map entry - delete(fs.listMultipartObjectMap, params) - } - - return nil -} - // newFS instantiate a new filesystem. func newFS(rootPath string) (ObjectAPI, *probe.Error) { fs := &Filesystem{ From 8986a6802ae0b4d80a634668a3bc2e617803d542 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Apr 2016 15:08:59 -0700 Subject: [PATCH 4/4] Fix ListMultipartUploads 'mc ls -I' now works properly. --- api-response.go | 7 +++- fs-multipart.go | 86 ++++++++++++++++++++++++++++--------------------- 2 files changed, 55 insertions(+), 38 deletions(-) diff --git a/api-response.go b/api-response.go index e9c047673..11a3ba351 100644 --- a/api-response.go +++ b/api-response.go @@ -359,7 +359,12 @@ func generateListMultipartUploadsResponse(bucket string, multipartsInfo ListMult listMultipartUploadsResponse.MaxUploads = multipartsInfo.MaxUploads listMultipartUploadsResponse.NextUploadIDMarker = multipartsInfo.NextUploadIDMarker listMultipartUploadsResponse.UploadIDMarker = multipartsInfo.UploadIDMarker - + listMultipartUploadsResponse.CommonPrefixes = make([]CommonPrefix, len(multipartsInfo.CommonPrefixes)) + for index, commonPrefix := range multipartsInfo.CommonPrefixes { + listMultipartUploadsResponse.CommonPrefixes[index] = CommonPrefix{ + Prefix: commonPrefix, + } + } listMultipartUploadsResponse.Uploads = make([]Upload, len(multipartsInfo.Uploads)) for index, upload := range multipartsInfo.Uploads { newUpload := Upload{} diff --git a/fs-multipart.go b/fs-multipart.go index f70f5625a..43ba56151 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -57,7 +57,14 @@ func removeFileTree(fileName string, level string) error { return nil } -func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error { +func safeRemoveFile(file *os.File) error { + if e := file.Close(); e != nil { + return e + } + return os.Remove(file.Name()) +} + +func safeWriteFile(fileName string, data io.Reader, size int64, md5sum string) error { tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-") if e != nil { return e @@ -66,8 +73,7 @@ func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error md5Hasher := md5.New() multiWriter := io.MultiWriter(md5Hasher, tempFile) if _, e := io.CopyN(multiWriter, data, size); e != nil { - tempFile.Close() - os.Remove(tempFile.Name()) + safeRemoveFile(tempFile) return e } tempFile.Close() @@ -284,8 +290,8 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i return "", probe.NewError(e) } - partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex) - if e := safeWrite(partFile, data, size, md5Hex); e != nil { + partFile := filepath.Join(fs.path, configDir, bucket, object, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex)) + if e := safeWriteFile(partFile, data, size, md5Hex); e != nil { return "", probe.NewError(e) } @@ -356,34 +362,30 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.") if e != nil { - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) return ObjectInfo{}, probe.NewError(e) } - for _, part := range parts { partNumber := part.PartNumber - md5sum := strings.Trim(part.ETag, "\"") - partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum) - var f *os.File - f, e = os.Open(partFile) + // Trim off the odd double quotes from ETag in the beginning and end. + md5sum := strings.TrimPrefix(part.ETag, "\"") + md5sum = strings.TrimSuffix(md5sum, "\"") + partFileStr := filepath.Join(metaObjectDir, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5sum)) + var partFile *os.File + partFile, e = os.Open(partFileStr) if e != nil { - tempFile.Close() - os.Remove(tempFile.Name()) - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + safeRemoveFile(tempFile) return ObjectInfo{}, probe.NewError(e) - } else if _, e = io.Copy(tempFile, f); e != nil { - tempFile.Close() - os.Remove(tempFile.Name()) - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) + } else if _, e = io.Copy(tempFile, partFile); e != nil { + safeRemoveFile(tempFile) return ObjectInfo{}, probe.NewError(e) } - f.Close() + partFile.Close() // Close part file after successful copy. } tempFile.Close() - // fi is used later - fi, e := os.Stat(tempFile.Name()) + + // Stat to gather fresh stat info. + objSt, e := os.Stat(tempFile.Name()) if e != nil { - os.Remove(tempFile.Name()) return ObjectInfo{}, probe.NewError(e) } @@ -391,7 +393,6 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa objectPath := filepath.Join(bucketPath, object) if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil { os.Remove(tempFile.Name()) - //return ObjectInfo{}, probe.NewError(InternalError{Err: err}) return ObjectInfo{}, probe.NewError(e) } if e = os.Rename(tempFile.Name(), objectPath); e != nil { @@ -411,8 +412,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa newObject := ObjectInfo{ Bucket: bucket, Name: object, - ModifiedTime: fi.ModTime(), - Size: fi.Size(), + ModifiedTime: objSt.ModTime(), + Size: objSt.Size(), ContentType: contentType, MD5Sum: s3MD5, } @@ -482,18 +483,14 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa } markerPath := filepath.FromSlash(keyMarker) - if uploadIDMarker != "" { if strings.HasSuffix(markerPath, string(os.PathSeparator)) { return result, probe.NewError(fmt.Errorf("Invalid combination of uploadID marker '%s' and marker '%s'", uploadIDMarker, keyMarker)) } - - id, err := uuid.Parse(uploadIDMarker) - - if err != nil { - return result, probe.NewError(err) + id, e := uuid.Parse(uploadIDMarker) + if e != nil { + return result, probe.NewError(e) } - if id.IsZero() { return result, probe.NewError(fmt.Errorf("Invalid upload ID marker %s", uploadIDMarker)) } @@ -516,9 +513,16 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa recursive = false } - bucketDir := filepath.Join(fs.path, bucket) - // If listMultipartObjectChannel is available for given parameters, then use it, else create new one - multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker}) + bucketDir := filepath.Join(fs.path, configDir, bucket) + // Lookup of if listMultipartObjectChannel is available for given + // parameters, else create a new one. + multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{ + bucket: bucket, + delimiter: delimiter, + keyMarker: markerPath, + prefix: prefixPath, + uploadIDMarker: uploadIDMarker, + }) if multipartObjectInfoCh == nil { ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive) multipartObjectInfoCh = &ch @@ -534,10 +538,14 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa } if multipartObjInfo.Err != nil { + if os.IsNotExist(multipartObjInfo.Err) { + return ListMultipartsInfo{}, nil + } return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err) } - if strings.Contains(multipartObjInfo.Name, "$multiparts") || strings.Contains(multipartObjInfo.Name, "$tmpobject") { + if strings.Contains(multipartObjInfo.Name, "$multiparts") || + strings.Contains(multipartObjInfo.Name, "$tmpobject") { continue } @@ -548,7 +556,11 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa if multipartObjInfo.IsDir { result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) } else { - result.Uploads = append(result.Uploads, uploadMetadata{Object: multipartObjInfo.Name, UploadID: multipartObjInfo.UploadID, Initiated: multipartObjInfo.ModifiedTime}) + result.Uploads = append(result.Uploads, uploadMetadata{ + Object: multipartObjInfo.Name, + UploadID: multipartObjInfo.UploadID, + Initiated: multipartObjInfo.ModifiedTime, + }) } nextKeyMarker = multipartObjInfo.Name nextUploadIDMarker = multipartObjInfo.UploadID