diff --git a/pkg/storage/drivers/fs/fs_multipart.go b/pkg/storage/drivers/fs/fs_multipart.go index 779787504..e2f89878a 100644 --- a/pkg/storage/drivers/fs/fs_multipart.go +++ b/pkg/storage/drivers/fs/fs_multipart.go @@ -36,6 +36,69 @@ type Multiparts struct { ActiveSession map[string]*MultipartSession } +func (fs *fsDriver) loadActiveSessions(bucket string) { + bucketPath := path.Join(fs.root, bucket) + _, err := os.Stat(bucketPath) + if err != nil { + return + } + activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_RDONLY, 0600) + if err != nil { + return + } + defer activeSessionFile.Close() + var deserializedActiveSession map[string]*MultipartSession + decoder := json.NewDecoder(activeSessionFile) + err = decoder.Decode(&deserializedActiveSession) + if err != nil { + return + } + for key, value := range deserializedActiveSession { + fs.multiparts.ActiveSession[key] = value + } + return +} + +func (fs *fsDriver) isValidUploadID(key, uploadID string) bool { + s, ok := fs.multiparts.ActiveSession[key] + if !ok { + return false + } + if uploadID == s.UploadID { + return true + } + return false +} + +func (fs *fsDriver) writePart(objectPath string, partID int, size int64, data io.Reader) (drivers.PartMetadata, error) { + partPath := objectPath + fmt.Sprintf("$%d", partID) + // write part + partFile, err := os.OpenFile(partPath, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return drivers.PartMetadata{}, iodine.New(err, nil) + } + defer partFile.Close() + + h := md5.New() + mw := io.MultiWriter(partFile, h) + + _, err = io.CopyN(mw, data, size) + if err != nil { + return drivers.PartMetadata{}, iodine.New(err, nil) + } + + fi, err := os.Stat(partPath) + if err != nil { + return drivers.PartMetadata{}, iodine.New(err, nil) + } + partMetadata := drivers.PartMetadata{} + partMetadata.ETag = hex.EncodeToString(h.Sum(nil)) + partMetadata.PartNumber = partID + partMetadata.Size = fi.Size() + partMetadata.LastModified = fi.ModTime() + return partMetadata, nil +} + // byKey is a sortable interface for UploadMetadata slice type byKey []*drivers.UploadMetadata @@ -51,7 +114,6 @@ func (fs *fsDriver) ListMultipartUploads(bucket string, resources drivers.Bucket } bucketPath := path.Join(fs.root, bucket) _, err := os.Stat(bucketPath) - // check bucket exists if os.IsNotExist(err) { return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) @@ -59,27 +121,11 @@ func (fs *fsDriver) ListMultipartUploads(bucket string, resources drivers.Bucket if err != nil { return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.InternalError{}, nil) } + // load from disk + fs.loadActiveSessions(bucket) - if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) { - return drivers.BucketMultipartResourcesMetadata{}, nil - } - if err != nil { - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) - } - activeSessionFile, err := os.OpenFile(bucketPath+"$activeSession", os.O_RDONLY, 0600) - if err != nil { - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) - } - defer activeSessionFile.Close() - - var deserializedActiveSession map[string]*MultipartSession - decoder := json.NewDecoder(activeSessionFile) - err = decoder.Decode(&deserializedActiveSession) - if err != nil { - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(err, nil) - } var uploads []*drivers.UploadMetadata - for key, session := range deserializedActiveSession { + for key, session := range fs.multiparts.ActiveSession { if strings.HasPrefix(key, resources.Prefix) { if len(uploads) > resources.MaxUploads { sort.Sort(byKey(uploads)) @@ -123,6 +169,34 @@ func (fs *fsDriver) ListMultipartUploads(bucket string, resources drivers.Bucket return resources, nil } +func (fs *fsDriver) concatParts(parts map[int]string, objectPath string, mw io.Writer) error { + for i := 1; i <= len(parts); i++ { + recvMD5 := parts[i] + partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", i), os.O_RDONLY, 0600) + if err != nil { + return iodine.New(err, nil) + } + obj, err := ioutil.ReadAll(partFile) + if err != nil { + return iodine.New(err, nil) + } + calcMD5Bytes := md5.Sum(obj) + // complete multi part request header md5sum per part is hex encoded + recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) + if err != nil { + return iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) + } + if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { + return iodine.New(drivers.BadDigest{Md5: recvMD5}, nil) + } + _, err = io.Copy(mw, bytes.NewBuffer(obj)) + if err != nil { + return iodine.New(err, nil) + } + } + return nil +} + func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -160,13 +234,15 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, } var activeSessionFile *os.File - if _, err := os.Stat(bucketPath + "$activeSession"); os.IsNotExist(err) { + _, err = os.Stat(bucketPath + "$activeSession") + switch { + case os.IsNotExist(err): activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_CREATE, 0600) if err != nil { return "", iodine.New(err, nil) } - } else { - activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY, 0600) + default: + activeSessionFile, err = os.OpenFile(bucketPath+"$activeSession", os.O_WRONLY|os.O_APPEND, 0600) if err != nil { return "", iodine.New(err, nil) } @@ -177,11 +253,11 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] - file, err := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600) + multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600) if err != nil { return "", iodine.New(err, nil) } - defer file.Close() + defer multiPartfile.Close() mpartSession := new(MultipartSession) mpartSession.TotalParts = 0 @@ -191,7 +267,7 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, mpartSession.Parts = parts fs.multiparts.ActiveSession[key] = mpartSession - encoder := json.NewEncoder(file) + encoder := json.NewEncoder(multiPartfile) err = encoder.Encode(mpartSession) if err != nil { return "", iodine.New(err, nil) @@ -205,46 +281,6 @@ func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, return uploadID, nil } -func (fs *fsDriver) isValidUploadID(key, uploadID string) bool { - s, ok := fs.multiparts.ActiveSession[key] - if !ok { - return false - } - if uploadID == s.UploadID { - return true - } - return false -} - -func (fs *fsDriver) writePart(objectPath string, partID int, size int64, data io.Reader) (drivers.PartMetadata, error) { - partPath := objectPath + fmt.Sprintf("$%d", partID) - // write part - partFile, err := os.OpenFile(partPath, os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return drivers.PartMetadata{}, iodine.New(err, nil) - } - defer partFile.Close() - - h := md5.New() - mw := io.MultiWriter(partFile, h) - - _, err = io.CopyN(mw, data, size) - if err != nil { - return drivers.PartMetadata{}, iodine.New(err, nil) - } - - fi, err := os.Stat(partPath) - if err != nil { - return drivers.PartMetadata{}, iodine.New(err, nil) - } - partMetadata := drivers.PartMetadata{} - partMetadata.ETag = hex.EncodeToString(h.Sum(nil)) - partMetadata.PartNumber = partID - partMetadata.Size = fi.Size() - partMetadata.LastModified = fi.ModTime() - return partMetadata, nil -} - // partNumber is a sortable interface for Part slice type partNumber []*drivers.PartMetadata @@ -321,7 +357,7 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c } } - multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR, 0600) + multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR|os.O_APPEND, 0600) if err != nil { return "", iodine.New(err, nil) } @@ -346,34 +382,6 @@ func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, c return partMetadata.ETag, nil } -func (fs *fsDriver) concatParts(parts map[int]string, objectPath string, mw io.Writer) error { - for i := 1; i <= len(parts); i++ { - recvMD5 := parts[i] - partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", i), os.O_RDONLY, 0600) - if err != nil { - return iodine.New(err, nil) - } - obj, err := ioutil.ReadAll(partFile) - if err != nil { - return iodine.New(err, nil) - } - calcMD5Bytes := md5.Sum(obj) - // complete multi part request header md5sum per part is hex encoded - recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) - if err != nil { - return iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) - } - if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { - return iodine.New(drivers.BadDigest{Md5: recvMD5}, nil) - } - _, err = io.Copy(mw, bytes.NewBuffer(obj)) - if err != nil { - return iodine.New(err, nil) - } - } - return nil -} - func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { fs.lock.Lock() defer fs.lock.Unlock() @@ -470,6 +478,9 @@ func (fs *fsDriver) ListObjectParts(bucket, key string, resources drivers.Object fs.lock.Lock() defer fs.lock.Unlock() + // load from disk + fs.loadActiveSessions(bucket) + // check bucket name valid if drivers.IsValidBucket(bucket) == false { return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) @@ -506,7 +517,7 @@ func (fs *fsDriver) ListObjectParts(bucket, key string, resources drivers.Object } objectPath := path.Join(bucketPath, key) - multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR, 0600) + multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDONLY, 0600) if err != nil { return drivers.ObjectResourcesMetadata{}, iodine.New(err, nil) }